044-网络爬虫系统设计

学习目标

通过本章学习,你将掌握:

  • 网络爬虫的基本原理和架构设计
  • 高性能爬虫系统的实现方法
  • 反爬虫机制的应对策略
  • 分布式爬虫系统的设计与实现
  • 数据存储和处理的最佳实践
  • 爬虫监控和运维管理

1. 项目概述

1.1 背景介绍

网络爬虫是一种自动化程序,用于从互联网上收集和提取数据。在大数据时代,爬虫技术已成为数据获取的重要手段,广泛应用于搜索引擎、数据分析、市场研究、价格监控等领域。

1.2 技术架构

分布式组件
Redis队列
消息队列
分布式锁
调度器
URL队列
下载器
解析器
数据管道
数据存储
代理池
请求限制
反爬虫处理
监控系统

1.3 项目结构

import os
import json
from typing import Dict, List
from pathlib import Path

class CrawlerProjectStructure:
    """
    爬虫项目结构生成器
    """
    
    def __init__(self):
        self.structure = {
            'crawler/': {
                '__init__.py': '',
                'core/': {
                    '__init__.py': '',
                    'scheduler.py': '# 调度器模块',
                    'downloader.py': '# 下载器模块',
                    'parser.py': '# 解析器模块',
                    'pipeline.py': '# 数据管道模块',
                    'middleware.py': '# 中间件模块'
                },
                'utils/': {
                    '__init__.py': '',
                    'proxy.py': '# 代理管理',
                    'user_agent.py': '# User-Agent管理',
                    'captcha.py': '# 验证码处理',
                    'bloom_filter.py': '# 布隆过滤器',
                    'rate_limiter.py': '# 速率限制器'
                },
                'spiders/': {
                    '__init__.py': '',
                    'base_spider.py': '# 基础爬虫类',
                    'example_spider.py': '# 示例爬虫'
                },
                'storage/': {
                    '__init__.py': '',
                    'database.py': '# 数据库操作',
                    'file_storage.py': '# 文件存储',
                    'cache.py': '# 缓存管理'
                },
                'monitoring/': {
                    '__init__.py': '',
                    'metrics.py': '# 指标收集',
                    'alerts.py': '# 告警系统',
                    'dashboard.py': '# 监控面板'
                }
            },
            'config/': {
                'settings.py': '# 配置文件',
                'logging.conf': '# 日志配置',
                'redis.conf': '# Redis配置',
                'proxy_list.txt': '# 代理列表'
            },
            'tests/': {
                '__init__.py': '',
                'test_downloader.py': '# 下载器测试',
                'test_parser.py': '# 解析器测试',
                'test_pipeline.py': '# 管道测试'
            },
            'scripts/': {
                'start_crawler.py': '# 启动脚本',
                'deploy.py': '# 部署脚本',
                'monitor.py': '# 监控脚本'
            },
            'docker/': {
                'Dockerfile': '# Docker配置',
                'docker-compose.yml': '# Docker Compose配置',
                'requirements.txt': '# Python依赖'
            },
            'logs/': {},
            'data/': {},
            'README.md': '# 项目说明',
            'requirements.txt': '# 项目依赖'
        }
    
    def create_structure(self, base_path: str):
        """
        创建项目结构
        """
        base_path = Path(base_path)
        base_path.mkdir(parents=True, exist_ok=True)
        
        self._create_directory_structure(base_path, self.structure)
        self._create_config_files(base_path)
        self._create_requirements_file(base_path)
        
        print(f"爬虫项目结构已创建: {base_path}")
    
    def _create_directory_structure(self, base_path: Path, structure: Dict):
        """
        递归创建目录结构
        """
        for name, content in structure.items():
            path = base_path / name
            
            if name.endswith('/'):
                # 创建目录
                path.mkdir(exist_ok=True)
                if isinstance(content, dict):
                    self._create_directory_structure(path, content)
            else:
                # 创建文件
                if isinstance(content, str):
                    path.write_text(content, encoding='utf-8')
    
    def _create_config_files(self, base_path: Path):
        """
        创建配置文件
        """
        # 创建设置文件
        settings_content = '''
# 爬虫配置文件

# 基础设置
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
DOWNLOAD_DELAY = 1
CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 8

# 重试设置
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]

# 代理设置
PROXY_ENABLED = True
PROXY_ROTATION = True

# 数据库设置
DATABASE_URL = 'postgresql://user:password@localhost:5432/crawler_db'
REDIS_URL = 'redis://localhost:6379/0'

# 监控设置
MONITORING_ENABLED = True
METRICS_PORT = 8080
'''
        (base_path / 'config' / 'settings.py').write_text(settings_content, encoding='utf-8')
        
        # 创建Docker配置
        dockerfile_content = '''
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "scripts/start_crawler.py"]
'''
        (base_path / 'docker' / 'Dockerfile').write_text(dockerfile_content, encoding='utf-8')
        
        # 创建docker-compose配置
        compose_content = '''
version: '3.8'

services:
  crawler:
    build: .
    depends_on:
      - redis
      - postgres
    environment:
      - REDIS_URL=redis://redis:6379/0
      - DATABASE_URL=postgresql://crawler:password@postgres:5432/crawler_db
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
  
  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
  
  postgres:
    image: postgres:13
    environment:
      - POSTGRES_DB=crawler_db
      - POSTGRES_USER=crawler
      - POSTGRES_PASSWORD=password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:
'''
        (base_path / 'docker' / 'docker-compose.yml').write_text(compose_content, encoding='utf-8')
    
    def _create_requirements_file(self, base_path: Path):
        """
        创建依赖文件
        """
        requirements = [
            'requests>=2.28.0',
            'aiohttp>=3.8.0',
            'beautifulsoup4>=4.11.0',
            'lxml>=4.9.0',
            'scrapy>=2.6.0',
            'selenium>=4.5.0',
            'redis>=4.3.0',
            'sqlalchemy>=1.4.0',
            'psycopg2-binary>=2.9.0',
            'pymongo>=4.2.0',
            'celery>=5.2.0',
            'flower>=1.2.0',
            'prometheus-client>=0.14.0',
            'fake-useragent>=1.2.0',
            'pybloom-live>=3.1.0',
            'pillow>=9.2.0',
            'opencv-python>=4.6.0',
            'tesseract>=0.1.3',
            'pandas>=1.5.0',
            'numpy>=1.23.0'
        ]
        
        (base_path / 'requirements.txt').write_text('\n'.join(requirements), encoding='utf-8')
        (base_path / 'docker' / 'requirements.txt').write_text('\n'.join(requirements), encoding='utf-8')

2. 核心组件设计

2.1 调度器设计

import asyncio
import time
import logging
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass
from enum import Enum
from queue import PriorityQueue
import redis
import json
from urllib.parse import urljoin, urlparse

class Priority(Enum):
    """
    请求优先级
    """
    LOW = 3
    NORMAL = 2
    HIGH = 1
    URGENT = 0

@dataclass
class Request:
    """
    请求对象
    """
    url: str
    method: str = 'GET'
    headers: Dict = None
    data: Dict = None
    priority: Priority = Priority.NORMAL
    retry_count: int = 0
    max_retries: int = 3
    delay: float = 0
    callback: str = None
    meta: Dict = None
    timestamp: float = None
    
    def __post_init__(self):
        if self.headers is None:
            self.headers = {}
        if self.meta is None:
            self.meta = {}
        if self.timestamp is None:
            self.timestamp = time.time()
    
    def __lt__(self, other):
        return self.priority.value < other.priority.value

class Scheduler:
    """
    爬虫调度器
    """
    
    def __init__(self, redis_url: str = 'redis://localhost:6379/0',
                 max_concurrent: int = 100):
        self.redis_client = redis.from_url(redis_url)
        self.max_concurrent = max_concurrent
        self.running_requests = 0
        self.request_queue = PriorityQueue()
        self.seen_urls = set()
        self.domain_delays = {}
        self.logger = logging.getLogger(self.__class__.__name__)
        
        # Redis键名
        self.queue_key = 'crawler:requests'
        self.seen_key = 'crawler:seen_urls'
        self.stats_key = 'crawler:stats'
        
        # 统计信息
        self.stats = {
            'requests_total': 0,
            'requests_success': 0,
            'requests_failed': 0,
            'requests_pending': 0
        }
    
    def add_request(self, request: Request) -> bool:
        """
        添加请求到队列
        """
        # URL去重
        if self._is_duplicate(request.url):
            self.logger.debug(f"重复URL: {request.url}")
            return False
        
        # 添加到Redis队列
        request_data = {
            'url': request.url,
            'method': request.method,
            'headers': request.headers,
            'data': request.data,
            'priority': request.priority.value,
            'retry_count': request.retry_count,
            'max_retries': request.max_retries,
            'delay': request.delay,
            'callback': request.callback,
            'meta': request.meta,
            'timestamp': request.timestamp
        }
        
        # 使用优先级作为分数
        self.redis_client.zadd(
            self.queue_key,
            {json.dumps(request_data): request.priority.value}
        )
        
        # 标记URL为已见
        self._mark_seen(request.url)
        
        # 更新统计
        self.stats['requests_total'] += 1
        self.stats['requests_pending'] += 1
        self._update_stats()
        
        self.logger.info(f"添加请求: {request.url}")
        return True
    
    def get_request(self) -> Optional[Request]:
        """
        从队列获取请求
        """
        # 检查并发限制
        if self.running_requests >= self.max_concurrent:
            return None
        
        # 从Redis获取最高优先级的请求
        result = self.redis_client.zpopmin(self.queue_key, 1)
        if not result:
            return None
        
        request_json, priority = result[0]
        request_data = json.loads(request_json)
        
        # 检查域名延迟
        domain = urlparse(request_data['url']).netloc
        if self._should_delay(domain, request_data['delay']):
            # 重新放回队列
            self.redis_client.zadd(
                self.queue_key,
                {request_json: priority}
            )
            return None
        
        # 创建Request对象
        request = Request(
            url=request_data['url'],
            method=request_data['method'],
            headers=request_data['headers'],
            data=request_data['data'],
            priority=Priority(request_data['priority']),
            retry_count=request_data['retry_count'],
            max_retries=request_data['max_retries'],
            delay=request_data['delay'],
            callback=request_data['callback'],
            meta=request_data['meta'],
            timestamp=request_data['timestamp']
        )
        
        self.running_requests += 1
        self.stats['requests_pending'] -= 1
        self._update_domain_delay(domain)
        self._update_stats()
        
        return request
    
    def request_finished(self, request: Request, success: bool = True):
        """
        标记请求完成
        """
        self.running_requests -= 1
        
        if success:
            self.stats['requests_success'] += 1
        else:
            self.stats['requests_failed'] += 1
            
            # 重试逻辑
            if request.retry_count < request.max_retries:
                request.retry_count += 1
                request.delay = min(request.delay * 2, 60)  # 指数退避
                self.add_request(request)
                self.logger.info(f"重试请求: {request.url} (第{request.retry_count}次)")
        
        self._update_stats()
    
    def _is_duplicate(self, url: str) -> bool:
        """
        检查URL是否重复
        """
        return self.redis_client.sismember(self.seen_key, url)
    
    def _mark_seen(self, url: str):
        """
        标记URL为已见
        """
        self.redis_client.sadd(self.seen_key, url)
    
    def _should_delay(self, domain: str, min_delay: float) -> bool:
        """
        检查是否需要延迟
        """
        current_time = time.time()
        last_request_time = self.domain_delays.get(domain, 0)
        
        return (current_time - last_request_time) < min_delay
    
    def _update_domain_delay(self, domain: str):
        """
        更新域名最后请求时间
        """
        self.domain_delays[domain] = time.time()
    
    def _update_stats(self):
        """
        更新统计信息到Redis
        """
        self.redis_client.hmset(self.stats_key, self.stats)
    
    def get_stats(self) -> Dict:
        """
        获取统计信息
        """
        return self.stats.copy()
    
    def clear_queue(self):
        """
        清空队列
        """
        self.redis_client.delete(self.queue_key)
        self.redis_client.delete(self.seen_key)
        self.stats = {
            'requests_total': 0,
            'requests_success': 0,
            'requests_failed': 0,
            'requests_pending': 0
        }
        self._update_stats()
        self.logger.info("队列已清空")
    
    def get_queue_size(self) -> int:
        """
        获取队列大小
        """
        return self.redis_client.zcard(self.queue_key)

2.2 下载器设计

import aiohttp
import asyncio
import time
import random
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass
import logging
from fake_useragent import UserAgent
from urllib.parse import urljoin

@dataclass
class Response:
    """
    响应对象
    """
    url: str
    status: int
    headers: Dict
    content: bytes
    text: str
    encoding: str
    request: Request
    meta: Dict = None
    
    def __post_init__(self):
        if self.meta is None:
            self.meta = {}

class ProxyManager:
    """
    代理管理器
    """
    
    def __init__(self, proxy_list: List[str] = None):
        self.proxy_list = proxy_list or []
        self.failed_proxies = set()
        self.proxy_stats = {}
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def get_proxy(self) -> Optional[str]:
        """
        获取可用代理
        """
        available_proxies = [
            proxy for proxy in self.proxy_list 
            if proxy not in self.failed_proxies
        ]
        
        if not available_proxies:
            return None
        
        return random.choice(available_proxies)
    
    def mark_proxy_failed(self, proxy: str):
        """
        标记代理失败
        """
        self.failed_proxies.add(proxy)
        self.logger.warning(f"代理失败: {proxy}")
    
    def mark_proxy_success(self, proxy: str):
        """
        标记代理成功
        """
        if proxy in self.failed_proxies:
            self.failed_proxies.remove(proxy)
        
        # 更新统计
        if proxy not in self.proxy_stats:
            self.proxy_stats[proxy] = {'success': 0, 'failed': 0}
        self.proxy_stats[proxy]['success'] += 1
    
    def reset_failed_proxies(self):
        """
        重置失败代理列表
        """
        self.failed_proxies.clear()
        self.logger.info("已重置失败代理列表")

class UserAgentManager:
    """
    User-Agent管理器
    """
    
    def __init__(self):
        self.ua = UserAgent()
        self.custom_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
        ]
    
    def get_random_user_agent(self) -> str:
        """
        获取随机User-Agent
        """
        try:
            return self.ua.random
        except:
            return random.choice(self.custom_agents)

class Downloader:
    """
    异步下载器
    """
    
    def __init__(self, 
                 concurrent_limit: int = 100,
                 timeout: int = 30,
                 proxy_manager: ProxyManager = None,
                 user_agent_manager: UserAgentManager = None):
        
        self.concurrent_limit = concurrent_limit
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.proxy_manager = proxy_manager or ProxyManager()
        self.ua_manager = user_agent_manager or UserAgentManager()
        self.session = None
        self.semaphore = asyncio.Semaphore(concurrent_limit)
        self.logger = logging.getLogger(self.__class__.__name__)
        
        # 下载统计
        self.stats = {
            'downloads_total': 0,
            'downloads_success': 0,
            'downloads_failed': 0,
            'bytes_downloaded': 0,
            'avg_response_time': 0
        }
        self.response_times = []
    
    async def __aenter__(self):
        """
        异步上下文管理器入口
        """
        connector = aiohttp.TCPConnector(
            limit=self.concurrent_limit,
            limit_per_host=20,
            ttl_dns_cache=300,
            use_dns_cache=True
        )
        
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=self.timeout
        )
        
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """
        异步上下文管理器出口
        """
        if self.session:
            await self.session.close()
    
    async def download(self, request: Request) -> Optional[Response]:
        """
        下载单个请求
        """
        async with self.semaphore:
            start_time = time.time()
            
            try:
                # 准备请求参数
                headers = self._prepare_headers(request.headers)
                proxy = self.proxy_manager.get_proxy()
                
                # 发送请求
                async with self.session.request(
                    method=request.method,
                    url=request.url,
                    headers=headers,
                    data=request.data,
                    proxy=proxy
                ) as resp:
                    
                    content = await resp.read()
                    text = await resp.text()
                    
                    response = Response(
                        url=str(resp.url),
                        status=resp.status,
                        headers=dict(resp.headers),
                        content=content,
                        text=text,
                        encoding=resp.charset or 'utf-8',
                        request=request,
                        meta=request.meta
                    )
                    
                    # 更新统计
                    response_time = time.time() - start_time
                    self._update_stats(True, len(content), response_time)
                    
                    if proxy:
                        self.proxy_manager.mark_proxy_success(proxy)
                    
                    self.logger.info(
                        f"下载成功: {request.url} [{resp.status}] "
                        f"{len(content)}字节 {response_time:.2f}秒"
                    )
                    
                    return response
            
            except Exception as e:
                # 更新统计
                response_time = time.time() - start_time
                self._update_stats(False, 0, response_time)
                
                if proxy:
                    self.proxy_manager.mark_proxy_failed(proxy)
                
                self.logger.error(f"下载失败: {request.url} - {str(e)}")
                return None
    
    def _prepare_headers(self, custom_headers: Dict) -> Dict:
        """
        准备请求头
        """
        headers = {
            'User-Agent': self.ua_manager.get_random_user_agent(),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1'
        }
        
        # 合并自定义头
        if custom_headers:
            headers.update(custom_headers)
        
        return headers
    
    def _update_stats(self, success: bool, bytes_count: int, response_time: float):
        """
        更新下载统计
        """
        self.stats['downloads_total'] += 1
        
        if success:
            self.stats['downloads_success'] += 1
            self.stats['bytes_downloaded'] += bytes_count
        else:
            self.stats['downloads_failed'] += 1
        
        self.response_times.append(response_time)
        if len(self.response_times) > 1000:  # 保持最近1000次的记录
            self.response_times.pop(0)
        
        self.stats['avg_response_time'] = sum(self.response_times) / len(self.response_times)
    
    def get_stats(self) -> Dict:
        """
        获取下载统计
        """
        return self.stats.copy()

2.3 解析器设计

from bs4 import BeautifulSoup
import lxml.html
import json
import re
from typing import Dict, List, Any, Optional, Generator
from dataclasses import dataclass
import logging
from urllib.parse import urljoin, urlparse

@dataclass
class Item:
    """
    数据项
    """
    data: Dict
    source_url: str
    timestamp: float = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()

class BaseParser:
    """
    基础解析器
    """
    
    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def parse(self, response: Response) -> Generator[Item, None, None]:
        """
        解析响应,生成数据项
        """
        raise NotImplementedError
    
    def extract_links(self, response: Response) -> List[str]:
        """
        提取链接
        """
        raise NotImplementedError

class HTMLParser(BaseParser):
    """
    HTML解析器
    """
    
    def __init__(self, parser_type: str = 'lxml'):
        super().__init__()
        self.parser_type = parser_type
    
    def parse_with_css(self, response: Response, selectors: Dict) -> Generator[Item, None, None]:
        """
        使用CSS选择器解析
        """
        soup = BeautifulSoup(response.text, self.parser_type)
        
        # 查找所有匹配的元素
        container_selector = selectors.get('container')
        if container_selector:
            containers = soup.select(container_selector)
        else:
            containers = [soup]
        
        for container in containers:
            item_data = {}
            
            for field, selector in selectors.items():
                if field == 'container':
                    continue
                
                elements = container.select(selector)
                if elements:
                    if len(elements) == 1:
                        item_data[field] = self._extract_text(elements[0])
                    else:
                        item_data[field] = [self._extract_text(el) for el in elements]
            
            if item_data:
                yield Item(data=item_data, source_url=response.url)
    
    def parse_with_xpath(self, response: Response, xpaths: Dict) -> Generator[Item, None, None]:
        """
        使用XPath解析
        """
        tree = lxml.html.fromstring(response.content)
        
        # 查找所有匹配的元素
        container_xpath = xpaths.get('container')
        if container_xpath:
            containers = tree.xpath(container_xpath)
        else:
            containers = [tree]
        
        for container in containers:
            item_data = {}
            
            for field, xpath in xpaths.items():
                if field == 'container':
                    continue
                
                elements = container.xpath(xpath)
                if elements:
                    if len(elements) == 1:
                        item_data[field] = self._extract_xpath_text(elements[0])
                    else:
                        item_data[field] = [self._extract_xpath_text(el) for el in elements]
            
            if item_data:
                yield Item(data=item_data, source_url=response.url)
    
    def extract_links(self, response: Response, 
                     link_selector: str = 'a[href]') -> List[str]:
        """
        提取页面链接
        """
        soup = BeautifulSoup(response.text, self.parser_type)
        links = []
        
        for link in soup.select(link_selector):
            href = link.get('href')
            if href:
                absolute_url = urljoin(response.url, href)
                links.append(absolute_url)
        
        return links
    
    def _extract_text(self, element) -> str:
        """
        提取元素文本
        """
        if hasattr(element, 'get_text'):
            return element.get_text(strip=True)
        elif hasattr(element, 'text'):
            return element.text.strip()
        else:
            return str(element).strip()
    
    def _extract_xpath_text(self, element) -> str:
        """
        提取XPath元素文本
        """
        if hasattr(element, 'text_content'):
            return element.text_content().strip()
        else:
            return str(element).strip()

class JSONParser(BaseParser):
    """
    JSON解析器
    """
    
    def parse_json_response(self, response: Response, 
                           json_path: str = None) -> Generator[Item, None, None]:
        """
        解析JSON响应
        """
        try:
            data = json.loads(response.text)
            
            if json_path:
                # 使用JSONPath提取数据
                data = self._extract_json_path(data, json_path)
            
            if isinstance(data, list):
                for item in data:
                    yield Item(data=item, source_url=response.url)
            else:
                yield Item(data=data, source_url=response.url)
        
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON解析失败: {response.url} - {str(e)}")
    
    def _extract_json_path(self, data: Dict, path: str) -> Any:
        """
        简单的JSONPath实现
        """
        keys = path.split('.')
        current = data
        
        for key in keys:
            if isinstance(current, dict) and key in current:
                current = current[key]
            elif isinstance(current, list) and key.isdigit():
                index = int(key)
                if 0 <= index < len(current):
                    current = current[index]
                else:
                    return None
            else:
                return None
        
        return current

class RegexParser(BaseParser):
    """
    正则表达式解析器
    """
    
    def parse_with_regex(self, response: Response, 
                        patterns: Dict) -> Generator[Item, None, None]:
        """
        使用正则表达式解析
        """
        text = response.text
        
        # 查找所有匹配
        container_pattern = patterns.get('container')
        if container_pattern:
            containers = re.finditer(container_pattern, text, re.DOTALL)
        else:
            containers = [text]
        
        for container in containers:
            if isinstance(container, str):
                container_text = container
            else:
                container_text = container.group()
            
            item_data = {}
            
            for field, pattern in patterns.items():
                if field == 'container':
                    continue
                
                matches = re.findall(pattern, container_text)
                if matches:
                    if len(matches) == 1:
                        item_data[field] = matches[0]
                    else:
                        item_data[field] = matches
            
            if item_data:
                yield Item(data=item_data, source_url=response.url)

class ParserManager:
    """
    解析器管理器
    """
    
    def __init__(self):
        self.parsers = {
            'html': HTMLParser(),
            'json': JSONParser(),
            'regex': RegexParser()
        }
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def register_parser(self, name: str, parser: BaseParser):
        """
        注册解析器
        """
        self.parsers[name] = parser
        self.logger.info(f"注册解析器: {name}")
    
    def get_parser(self, name: str) -> Optional[BaseParser]:
        """
        获取解析器
        """
        return self.parsers.get(name)
    
    def parse_response(self, response: Response, 
                      parser_config: Dict) -> Generator[Item, None, None]:
        """
        解析响应
        """
        parser_type = parser_config.get('type', 'html')
        parser = self.get_parser(parser_type)
        
        if not parser:
            self.logger.error(f"未找到解析器: {parser_type}")
            return
        
        try:
            if parser_type == 'html':
                method = parser_config.get('method', 'css')
                selectors = parser_config.get('selectors', {})
                
                if method == 'css':
                    yield from parser.parse_with_css(response, selectors)
                elif method == 'xpath':
                    yield from parser.parse_with_xpath(response, selectors)
            
            elif parser_type == 'json':
                json_path = parser_config.get('json_path')
                yield from parser.parse_json_response(response, json_path)
            
            elif parser_type == 'regex':
                patterns = parser_config.get('patterns', {})
                yield from parser.parse_with_regex(response, patterns)
        
        except Exception as e:
            self.logger.error(f"解析失败: {response.url} - {str(e)}")

2.4 数据管道设计

import asyncio
from typing import List, Dict, Any, Optional, Callable
from abc import ABC, abstractmethod
import logging
import json
import csv
from pathlib import Path
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import pymongo
from datetime import datetime

Base = declarative_base()

class CrawledData(Base):
    """
    爬取数据表模型
    """
    __tablename__ = 'crawled_data'
    
    id = sa.Column(sa.Integer, primary_key=True)
    url = sa.Column(sa.String(500), nullable=False)
    data = sa.Column(sa.JSON, nullable=False)
    spider_name = sa.Column(sa.String(100), nullable=False)
    created_at = sa.Column(sa.DateTime, default=datetime.utcnow)
    updated_at = sa.Column(sa.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class BasePipeline(ABC):
    """
    基础管道
    """
    
    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)
    
    @abstractmethod
    async def process_item(self, item: Item, spider_name: str) -> Item:
        """
        处理数据项
        """
        pass
    
    async def open_spider(self, spider_name: str):
        """
        爬虫开始时调用
        """
        pass
    
    async def close_spider(self, spider_name: str):
        """
        爬虫结束时调用
        """
        pass

class ValidationPipeline(BasePipeline):
    """
    数据验证管道
    """
    
    def __init__(self, required_fields: List[str] = None,
                 field_types: Dict[str, type] = None):
        super().__init__()
        self.required_fields = required_fields or []
        self.field_types = field_types or {}
        self.stats = {
            'items_processed': 0,
            'items_valid': 0,
            'items_invalid': 0
        }
    
    async def process_item(self, item: Item, spider_name: str) -> Optional[Item]:
        """
        验证数据项
        """
        self.stats['items_processed'] += 1
        
        # 检查必需字段
        for field in self.required_fields:
            if field not in item.data or not item.data[field]:
                self.logger.warning(f"缺少必需字段 {field}: {item.source_url}")
                self.stats['items_invalid'] += 1
                return None
        
        # 检查字段类型
        for field, expected_type in self.field_types.items():
            if field in item.data:
                try:
                    if not isinstance(item.data[field], expected_type):
                        # 尝试类型转换
                        item.data[field] = expected_type(item.data[field])
                except (ValueError, TypeError):
                    self.logger.warning(
                        f"字段 {field} 类型错误: {item.source_url}"
                    )
                    self.stats['items_invalid'] += 1
                    return None
        
        self.stats['items_valid'] += 1
        return item
    
    def get_stats(self) -> Dict:
        return self.stats.copy()

class CleaningPipeline(BasePipeline):
    """
    数据清洗管道
    """
    
    def __init__(self, cleaning_rules: Dict[str, Callable] = None):
        super().__init__()
        self.cleaning_rules = cleaning_rules or {}
        self.default_rules = {
            'strip_whitespace': lambda x: x.strip() if isinstance(x, str) else x,
            'remove_html': self._remove_html_tags,
            'normalize_price': self._normalize_price
        }
    
    async def process_item(self, item: Item, spider_name: str) -> Item:
        """
        清洗数据项
        """
        for field, value in item.data.items():
            # 应用默认清洗规则
            if isinstance(value, str):
                value = value.strip()
            
            # 应用自定义清洗规则
            if field in self.cleaning_rules:
                try:
                    value = self.cleaning_rules[field](value)
                except Exception as e:
                    self.logger.warning(f"清洗字段 {field} 失败: {str(e)}")
            
            item.data[field] = value
        
        return item
    
    def _remove_html_tags(self, text: str) -> str:
        """
        移除HTML标签
        """
        import re
        clean = re.compile('<.*?>')
        return re.sub(clean, '', text)
    
    def _normalize_price(self, price: str) -> float:
        """
        标准化价格
        """
        import re
        if isinstance(price, str):
            # 提取数字
            numbers = re.findall(r'\d+\.?\d*', price.replace(',', ''))
            if numbers:
                return float(numbers[0])
        return 0.0

class DuplicateFilterPipeline(BasePipeline):
    """
    去重管道
    """
    
    def __init__(self, redis_url: str = 'redis://localhost:6379/1'):
        super().__init__()
        import redis
        self.redis_client = redis.from_url(redis_url)
        self.seen_key = 'crawler:seen_items'
        self.stats = {
            'items_processed': 0,
            'items_duplicate': 0,
            'items_unique': 0
        }
    
    async def process_item(self, item: Item, spider_name: str) -> Optional[Item]:
        """
        过滤重复项
        """
        self.stats['items_processed'] += 1
        
        # 生成项目指纹
        fingerprint = self._generate_fingerprint(item)
        
        # 检查是否已存在
        if self.redis_client.sismember(self.seen_key, fingerprint):
            self.logger.debug(f"重复项: {item.source_url}")
            self.stats['items_duplicate'] += 1
            return None
        
        # 标记为已见
        self.redis_client.sadd(self.seen_key, fingerprint)
        self.stats['items_unique'] += 1
        
        return item
    
    def _generate_fingerprint(self, item: Item) -> str:
        """
        生成项目指纹
        """
        import hashlib
        
        # 使用关键字段生成指纹
        key_data = {
            'url': item.source_url,
            'data': item.data
        }
        
        fingerprint_str = json.dumps(key_data, sort_keys=True)
        return hashlib.md5(fingerprint_str.encode()).hexdigest()
    
    def get_stats(self) -> Dict:
        return self.stats.copy()

class DatabasePipeline(BasePipeline):
    """
    数据库存储管道
    """
    
    def __init__(self, database_url: str):
        super().__init__()
        self.database_url = database_url
        self.engine = None
        self.Session = None
        self.stats = {
            'items_saved': 0,
            'items_failed': 0
        }
    
    async def open_spider(self, spider_name: str):
        """
        初始化数据库连接
        """
        self.engine = sa.create_engine(self.database_url)
        Base.metadata.create_all(self.engine)
        self.Session = sessionmaker(bind=self.engine)
        self.logger.info(f"数据库连接已建立: {spider_name}")
    
    async def close_spider(self, spider_name: str):
        """
        关闭数据库连接
        """
        if self.engine:
            self.engine.dispose()
        self.logger.info(f"数据库连接已关闭: {spider_name}")
    
    async def process_item(self, item: Item, spider_name: str) -> Item:
        """
        保存到数据库
        """
        session = self.Session()
        
        try:
            crawled_data = CrawledData(
                url=item.source_url,
                data=item.data,
                spider_name=spider_name
            )
            
            session.add(crawled_data)
            session.commit()
            
            self.stats['items_saved'] += 1
            self.logger.debug(f"数据已保存: {item.source_url}")
            
        except Exception as e:
            session.rollback()
            self.stats['items_failed'] += 1
            self.logger.error(f"保存失败: {item.source_url} - {str(e)}")
            
        finally:
            session.close()
        
        return item
    
    def get_stats(self) -> Dict:
        return self.stats.copy()

class MongoPipeline(BasePipeline):
    """
    MongoDB存储管道
    """
    
    def __init__(self, mongo_uri: str, database: str, collection: str):
        super().__init__()
        self.mongo_uri = mongo_uri
        self.database_name = database
        self.collection_name = collection
        self.client = None
        self.collection = None
        self.stats = {
            'items_saved': 0,
            'items_failed': 0
        }
    
    async def open_spider(self, spider_name: str):
        """
        初始化MongoDB连接
        """
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.collection = self.client[self.database_name][self.collection_name]
        self.logger.info(f"MongoDB连接已建立: {spider_name}")
    
    async def close_spider(self, spider_name: str):
        """
        关闭MongoDB连接
        """
        if self.client:
            self.client.close()
        self.logger.info(f"MongoDB连接已关闭: {spider_name}")
    
    async def process_item(self, item: Item, spider_name: str) -> Item:
        """
        保存到MongoDB
        """
        try:
            document = {
                'url': item.source_url,
                'data': item.data,
                'spider_name': spider_name,
                'timestamp': item.timestamp,
                'created_at': datetime.utcnow()
            }
            
            self.collection.insert_one(document)
            self.stats['items_saved'] += 1
            self.logger.debug(f"数据已保存到MongoDB: {item.source_url}")
            
        except Exception as e:
            self.stats['items_failed'] += 1
            self.logger.error(f"MongoDB保存失败: {item.source_url} - {str(e)}")
        
        return item
    
    def get_stats(self) -> Dict:
        return self.stats.copy()

class FilePipeline(BasePipeline):
    """
    文件存储管道
    """
    
    def __init__(self, output_dir: str, file_format: str = 'json'):
        super().__init__()
        self.output_dir = Path(output_dir)
        self.file_format = file_format
        self.files = {}
        self.stats = {
            'items_saved': 0,
            'items_failed': 0
        }
    
    async def open_spider(self, spider_name: str):
        """
        创建输出目录和文件
        """
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
        if self.file_format == 'json':
            file_path = self.output_dir / f"{spider_name}.json"
            self.files[spider_name] = open(file_path, 'w', encoding='utf-8')
        elif self.file_format == 'csv':
            file_path = self.output_dir / f"{spider_name}.csv"
            self.files[spider_name] = open(file_path, 'w', encoding='utf-8', newline='')
        
        self.logger.info(f"输出文件已创建: {spider_name}")
    
    async def close_spider(self, spider_name: str):
        """
        关闭文件
        """
        if spider_name in self.files:
            self.files[spider_name].close()
            del self.files[spider_name]
        self.logger.info(f"输出文件已关闭: {spider_name}")
    
    async def process_item(self, item: Item, spider_name: str) -> Item:
        """
        保存到文件
        """
        try:
            if self.file_format == 'json':
                data = {
                    'url': item.source_url,
                    'data': item.data,
                    'timestamp': item.timestamp
                }
                self.files[spider_name].write(json.dumps(data, ensure_ascii=False) + '\n')
            
            elif self.file_format == 'csv':
                if spider_name not in self.files:
                    return item
                
                # 扁平化数据
                flat_data = self._flatten_dict(item.data)
                flat_data['url'] = item.source_url
                flat_data['timestamp'] = item.timestamp
                
                writer = csv.DictWriter(self.files[spider_name], fieldnames=flat_data.keys())
                if self.files[spider_name].tell() == 0:  # 文件为空,写入头部
                    writer.writeheader()
                writer.writerow(flat_data)
            
            self.stats['items_saved'] += 1
            
        except Exception as e:
            self.stats['items_failed'] += 1
            self.logger.error(f"文件保存失败: {item.source_url} - {str(e)}")
        
        return item
    
    def _flatten_dict(self, d: Dict, parent_key: str = '', sep: str = '_') -> Dict:
        """
        扁平化字典
        """
        items = []
        for k, v in d.items():
            new_key = f"{parent_key}{sep}{k}" if parent_key else k
            if isinstance(v, dict):
                items.extend(self._flatten_dict(v, new_key, sep=sep).items())
            else:
                items.append((new_key, v))
        return dict(items)
    
    def get_stats(self) -> Dict:
        return self.stats.copy()

class PipelineManager:
    """
    管道管理器
    """
    
    def __init__(self):
        self.pipelines = []
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def add_pipeline(self, pipeline: BasePipeline):
        """
        添加管道
        """
        self.pipelines.append(pipeline)
        self.logger.info(f"添加管道: {pipeline.__class__.__name__}")
    
    async def open_spider(self, spider_name: str):
        """
        初始化所有管道
        """
        for pipeline in self.pipelines:
            await pipeline.open_spider(spider_name)
    
    async def close_spider(self, spider_name: str):
        """
        关闭所有管道
        """
        for pipeline in self.pipelines:
            await pipeline.close_spider(spider_name)
    
    async def process_item(self, item: Item, spider_name: str) -> Optional[Item]:
        """
        通过所有管道处理项目
        """
        current_item = item
        
        for pipeline in self.pipelines:
            try:
                current_item = await pipeline.process_item(current_item, spider_name)
                if current_item is None:
                    # 项目被过滤掉
                    return None
            except Exception as e:
                self.logger.error(
                    f"管道 {pipeline.__class__.__name__} 处理失败: {str(e)}"
                )
                return None
        
        return current_item
    
    def get_stats(self) -> Dict:
        """
        获取所有管道统计
        """
        stats = {}
        for pipeline in self.pipelines:
            if hasattr(pipeline, 'get_stats'):
                pipeline_name = pipeline.__class__.__name__
                stats[pipeline_name] = pipeline.get_stats()
        return stats

3. 反爬虫处理

3.1 反爬虫检测与应对

import random
import time
from typing import Dict, List, Optional
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import undetected_chromedriver as uc
from fake_useragent import UserAgent
import requests
from urllib.parse import urlparse

class AntiDetectionManager:
    """
    反检测管理器
    """
    
    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.ua = UserAgent()
        self.session_cookies = {}
        
    def get_random_headers(self) -> Dict[str, str]:
        """
        获取随机请求头
        """
        headers = {
            'User-Agent': self.ua.random,
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none',
            'Cache-Control': 'max-age=0'
        }
        
        # 随机添加一些可选头部
        optional_headers = {
            'X-Forwarded-For': self._generate_random_ip(),
            'X-Real-IP': self._generate_random_ip(),
            'X-Requested-With': 'XMLHttpRequest'
        }
        
        # 随机选择添加可选头部
        for key, value in optional_headers.items():
            if random.random() > 0.5:
                headers[key] = value
        
        return headers
    
    def _generate_random_ip(self) -> str:
        """
        生成随机IP地址
        """
        return f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}"
    
    def get_delay_time(self, base_delay: float = 1.0, 
                      randomize: bool = True) -> float:
        """
        获取延迟时间
        """
        if randomize:
            # 添加随机性,避免规律性访问
            return base_delay + random.uniform(0, base_delay)
        return base_delay
    
    def detect_anti_crawler(self, response: requests.Response) -> Dict[str, bool]:
        """
        检测反爬虫机制
        """
        detection_results = {
            'captcha': False,
            'rate_limit': False,
            'ip_blocked': False,
            'js_challenge': False,
            'cloudflare': False
        }
        
        # 检查状态码
        if response.status_code == 429:
            detection_results['rate_limit'] = True
        elif response.status_code == 403:
            detection_results['ip_blocked'] = True
        
        # 检查响应内容
        content = response.text.lower()
        
        # 验证码检测
        captcha_keywords = ['captcha', '验证码', 'verify', 'robot']
        if any(keyword in content for keyword in captcha_keywords):
            detection_results['captcha'] = True
        
        # Cloudflare检测
        if 'cloudflare' in content or 'cf-ray' in response.headers:
            detection_results['cloudflare'] = True
        
        # JavaScript挑战检测
        js_keywords = ['javascript', 'document.cookie', 'window.location']
        if any(keyword in content for keyword in js_keywords) and len(content) < 1000:
            detection_results['js_challenge'] = True
        
        return detection_results

class CaptchaSolver:
    """
    验证码解决器
    """
    
    def __init__(self, api_key: str = None):
        self.api_key = api_key
        self.logger = logging.getLogger(self.__class__.__name__)
    
    async def solve_image_captcha(self, image_url: str) -> Optional[str]:
        """
        解决图片验证码
        """
        try:
            # 这里可以集成第三方验证码识别服务
            # 如:2captcha, anti-captcha等
            self.logger.info(f"尝试解决验证码: {image_url}")
            
            # 模拟验证码识别过程
            await asyncio.sleep(10)  # 模拟识别时间
            
            # 返回识别结果(这里是示例)
            return "ABCD123"
            
        except Exception as e:
            self.logger.error(f"验证码识别失败: {str(e)}")
            return None
    
    async def solve_recaptcha(self, site_key: str, page_url: str) -> Optional[str]:
        """
        解决reCAPTCHA
        """
        try:
            # 集成reCAPTCHA解决方案
            self.logger.info(f"尝试解决reCAPTCHA: {site_key}")
            
            # 这里需要调用专门的reCAPTCHA解决服务
            await asyncio.sleep(30)  # reCAPTCHA通常需要更长时间
            
            return "recaptcha_response_token"
            
        except Exception as e:
            self.logger.error(f"reCAPTCHA解决失败: {str(e)}")
            return None

class BrowserManager:
    """
    浏览器管理器
    """
    
    def __init__(self, headless: bool = True, 
                 use_undetected: bool = True):
        self.headless = headless
        self.use_undetected = use_undetected
        self.drivers = []
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def create_driver(self, proxy: str = None) -> webdriver.Chrome:
        """
        创建浏览器驱动
        """
        if self.use_undetected:
            options = uc.ChromeOptions()
        else:
            options = Options()
        
        # 基础配置
        if self.headless:
            options.add_argument('--headless')
        
        # 反检测配置
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('--disable-blink-features=AutomationControlled')
        options.add_experimental_option("excludeSwitches", ["enable-automation"])
        options.add_experimental_option('useAutomationExtension', False)
        
        # 代理配置
        if proxy:
            options.add_argument(f'--proxy-server={proxy}')
        
        # 随机窗口大小
        width = random.randint(1200, 1920)
        height = random.randint(800, 1080)
        options.add_argument(f'--window-size={width},{height}')
        
        # 创建驱动
        if self.use_undetected:
            driver = uc.Chrome(options=options)
        else:
            driver = webdriver.Chrome(options=options)
        
        # 执行反检测脚本
        driver.execute_script("""
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined
            });
        """)
        
        self.drivers.append(driver)
        return driver
    
    def close_all_drivers(self):
        """
        关闭所有驱动
        """
        for driver in self.drivers:
            try:
                driver.quit()
            except Exception as e:
                self.logger.error(f"关闭驱动失败: {str(e)}")
        self.drivers.clear()
    
    async def smart_page_load(self, driver: webdriver.Chrome, 
                             url: str, timeout: int = 30) -> bool:
        """
        智能页面加载
        """
        try:
            # 随机延迟
            await asyncio.sleep(random.uniform(1, 3))
            
            driver.get(url)
            
            # 等待页面加载完成
            WebDriverWait(driver, timeout).until(
                lambda d: d.execute_script('return document.readyState') == 'complete'
            )
            
            # 模拟人类行为
            await self._simulate_human_behavior(driver)
            
            return True
            
        except Exception as e:
            self.logger.error(f"页面加载失败: {url} - {str(e)}")
            return False
    
    async def _simulate_human_behavior(self, driver: webdriver.Chrome):
        """
        模拟人类行为
        """
        try:
            # 随机滚动
            scroll_count = random.randint(1, 3)
            for _ in range(scroll_count):
                scroll_y = random.randint(100, 500)
                driver.execute_script(f"window.scrollBy(0, {scroll_y});")
                await asyncio.sleep(random.uniform(0.5, 2))
            
            # 随机鼠标移动
            from selenium.webdriver.common.action_chains import ActionChains
            actions = ActionChains(driver)
            
            for _ in range(random.randint(1, 3)):
                x = random.randint(100, 800)
                y = random.randint(100, 600)
                actions.move_by_offset(x, y)
                await asyncio.sleep(random.uniform(0.1, 0.5))
            
            actions.perform()
            
        except Exception as e:
            self.logger.warning(f"模拟人类行为失败: {str(e)}")

class ProxyRotator:
    """
    代理轮换器
    """
    
    def __init__(self, proxy_list: List[str] = None):
        self.proxy_list = proxy_list or []
        self.current_index = 0
        self.failed_proxies = set()
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def add_proxy(self, proxy: str):
        """
        添加代理
        """
        if proxy not in self.proxy_list:
            self.proxy_list.append(proxy)
            self.logger.info(f"添加代理: {proxy}")
    
    def get_next_proxy(self) -> Optional[str]:
        """
        获取下一个可用代理
        """
        if not self.proxy_list:
            return None
        
        attempts = 0
        while attempts < len(self.proxy_list):
            proxy = self.proxy_list[self.current_index]
            self.current_index = (self.current_index + 1) % len(self.proxy_list)
            
            if proxy not in self.failed_proxies:
                return proxy
            
            attempts += 1
        
        # 如果所有代理都失败了,重置失败列表
        if attempts >= len(self.proxy_list):
            self.failed_proxies.clear()
            self.logger.warning("所有代理都失败,重置失败列表")
            return self.proxy_list[0] if self.proxy_list else None
        
        return None
    
    def mark_proxy_failed(self, proxy: str):
        """
        标记代理失败
        """
        self.failed_proxies.add(proxy)
        self.logger.warning(f"代理失败: {proxy}")
    
    async def test_proxy(self, proxy: str, test_url: str = 'http://httpbin.org/ip') -> bool:
        """
        测试代理可用性
        """
        try:
            proxies = {
                'http': proxy,
                'https': proxy
            }
            
            response = requests.get(test_url, proxies=proxies, timeout=10)
            return response.status_code == 200
            
        except Exception as e:
            self.logger.error(f"代理测试失败 {proxy}: {str(e)}")
            return False
    
    async def validate_all_proxies(self):
        """
        验证所有代理
        """
        valid_proxies = []
        
        for proxy in self.proxy_list:
            if await self.test_proxy(proxy):
                valid_proxies.append(proxy)
            else:
                self.mark_proxy_failed(proxy)
        
        self.proxy_list = valid_proxies
        self.logger.info(f"有效代理数量: {len(valid_proxies)}")

class RateLimiter:
    """
    速率限制器
    """
    
    def __init__(self, max_requests: int = 10, 
                 time_window: int = 60):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = []
        self.logger = logging.getLogger(self.__class__.__name__)
    
    async def acquire(self) -> bool:
        """
        获取请求许可
        """
        current_time = time.time()
        
        # 清理过期请求
        self.requests = [
            req_time for req_time in self.requests 
            if current_time - req_time < self.time_window
        ]
        
        # 检查是否超过限制
        if len(self.requests) >= self.max_requests:
            # 计算需要等待的时间
            oldest_request = min(self.requests)
            wait_time = self.time_window - (current_time - oldest_request)
            
            if wait_time > 0:
                self.logger.info(f"速率限制,等待 {wait_time:.2f} 秒")
                await asyncio.sleep(wait_time)
        
        # 记录当前请求
        self.requests.append(current_time)
        return True
    
    def get_stats(self) -> Dict:
        """
        获取统计信息
        """
        current_time = time.time()
        recent_requests = [
            req_time for req_time in self.requests 
            if current_time - req_time < self.time_window
        ]
        
        return {
            'current_requests': len(recent_requests),
            'max_requests': self.max_requests,
            'time_window': self.time_window,
            'remaining_quota': self.max_requests - len(recent_requests)
        }

3.2 会话管理

import aiohttp
import asyncio
from typing import Dict, Optional, Any
import json
from http.cookies import SimpleCookie
import logging
from datetime import datetime, timedelta

class SessionManager:
    """
    会话管理器
    """
    
    def __init__(self, max_sessions: int = 10, 
                 session_timeout: int = 3600):
        self.max_sessions = max_sessions
        self.session_timeout = session_timeout
        self.sessions = {}
        self.session_stats = {}
        self.logger = logging.getLogger(self.__class__.__name__)
    
    async def create_session(self, session_id: str = None, 
                           proxy: str = None,
                           headers: Dict = None) -> aiohttp.ClientSession:
        """
        创建会话
        """
        if session_id is None:
            session_id = f"session_{len(self.sessions)}"
        
        # 清理过期会话
        await self._cleanup_expired_sessions()
        
        # 检查会话数量限制
        if len(self.sessions) >= self.max_sessions:
            # 移除最旧的会话
            oldest_session_id = min(
                self.sessions.keys(),
                key=lambda x: self.session_stats[x]['created_at']
            )
            await self.close_session(oldest_session_id)
        
        # 配置连接器
        connector_kwargs = {
            'limit': 100,
            'limit_per_host': 10,
            'ttl_dns_cache': 300,
            'use_dns_cache': True
        }
        
        if proxy:
            connector_kwargs['proxy'] = proxy
        
        connector = aiohttp.TCPConnector(**connector_kwargs)
        
        # 配置超时
        timeout = aiohttp.ClientTimeout(
            total=30,
            connect=10,
            sock_read=10
        )
        
        # 创建会话
        session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers=headers or {},
            cookie_jar=aiohttp.CookieJar()
        )
        
        # 保存会话信息
        self.sessions[session_id] = session
        self.session_stats[session_id] = {
            'created_at': datetime.now(),
            'last_used': datetime.now(),
            'request_count': 0,
            'proxy': proxy
        }
        
        self.logger.info(f"创建会话: {session_id}")
        return session
    
    async def get_session(self, session_id: str) -> Optional[aiohttp.ClientSession]:
        """
        获取会话
        """
        if session_id in self.sessions:
            # 更新最后使用时间
            self.session_stats[session_id]['last_used'] = datetime.now()
            return self.sessions[session_id]
        return None
    
    async def close_session(self, session_id: str):
        """
        关闭会话
        """
        if session_id in self.sessions:
            await self.sessions[session_id].close()
            del self.sessions[session_id]
            del self.session_stats[session_id]
            self.logger.info(f"关闭会话: {session_id}")
    
    async def close_all_sessions(self):
        """
        关闭所有会话
        """
        for session_id in list(self.sessions.keys()):
            await self.close_session(session_id)
    
    async def _cleanup_expired_sessions(self):
        """
        清理过期会话
        """
        current_time = datetime.now()
        expired_sessions = []
        
        for session_id, stats in self.session_stats.items():
            if (current_time - stats['last_used']).seconds > self.session_timeout:
                expired_sessions.append(session_id)
        
        for session_id in expired_sessions:
            await self.close_session(session_id)
    
    def get_session_stats(self) -> Dict:
        """
        获取会话统计
        """
        return {
            'total_sessions': len(self.sessions),
            'max_sessions': self.max_sessions,
            'session_details': self.session_stats.copy()
        }

class CookieManager:
    """
    Cookie管理器
    """
    
    def __init__(self, storage_file: str = None):
        self.storage_file = storage_file
        self.cookies = {}
        self.logger = logging.getLogger(self.__class__.__name__)
        
        if storage_file:
            self.load_cookies()
    
    def save_cookies_from_response(self, response: aiohttp.ClientResponse, 
                                  domain: str = None):
        """
        从响应中保存cookies
        """
        if domain is None:
            domain = response.url.host
        
        if domain not in self.cookies:
            self.cookies[domain] = {}
        
        for cookie in response.cookies:
            self.cookies[domain][cookie.key] = {
                'value': cookie.value,
                'expires': cookie.get('expires'),
                'path': cookie.get('path', '/'),
                'domain': cookie.get('domain', domain),
                'secure': cookie.get('secure', False),
                'httponly': cookie.get('httponly', False)
            }
        
        if self.storage_file:
            self.save_cookies()
    
    def get_cookies_for_domain(self, domain: str) -> Dict[str, str]:
        """
        获取域名的cookies
        """
        if domain in self.cookies:
            return {
                name: info['value'] 
                for name, info in self.cookies[domain].items()
                if not self._is_cookie_expired(info)
            }
        return {}
    
    def _is_cookie_expired(self, cookie_info: Dict) -> bool:
        """
        检查cookie是否过期
        """
        expires = cookie_info.get('expires')
        if expires:
            try:
                expire_time = datetime.strptime(expires, '%a, %d %b %Y %H:%M:%S %Z')
                return datetime.now() > expire_time
            except (ValueError, TypeError):
                pass
        return False
    
    def save_cookies(self):
        """
        保存cookies到文件
        """
        if self.storage_file:
            try:
                with open(self.storage_file, 'w') as f:
                    json.dump(self.cookies, f, indent=2)
                self.logger.info(f"Cookies已保存到: {self.storage_file}")
            except Exception as e:
                self.logger.error(f"保存cookies失败: {str(e)}")
    
    def load_cookies(self):
        """
        从文件加载cookies
        """
        if self.storage_file:
            try:
                with open(self.storage_file, 'r') as f:
                    self.cookies = json.load(f)
                self.logger.info(f"Cookies已从文件加载: {self.storage_file}")
            except FileNotFoundError:
                self.logger.info("Cookies文件不存在,使用空cookies")
            except Exception as e:
                self.logger.error(f"加载cookies失败: {str(e)}")
    
    def clear_cookies(self, domain: str = None):
        """
        清除cookies
        """
        if domain:
            if domain in self.cookies:
                del self.cookies[domain]
                self.logger.info(f"已清除域名 {domain} 的cookies")
        else:
            self.cookies.clear()
            self.logger.info("已清除所有cookies")
        
        if self.storage_file:
            self.save_cookies()

4. 爬虫引擎设计

4.1 核心引擎

import asyncio
from typing import Dict, List, Optional, Callable, Any
import logging
from dataclasses import dataclass, field
from datetime import datetime
import signal
import sys
from concurrent.futures import ThreadPoolExecutor

@dataclass
class SpiderConfig:
    """
    爬虫配置
    """
    name: str
    start_urls: List[str]
    allowed_domains: List[str] = field(default_factory=list)
    max_concurrent_requests: int = 16
    download_delay: float = 1.0
    randomize_delay: bool = True
    max_retry_times: int = 3
    retry_delay: float = 5.0
    timeout: int = 30
    follow_redirects: bool = True
    max_depth: int = 0  # 0表示无限制
    parser_config: Dict = field(default_factory=dict)
    pipeline_config: List[Dict] = field(default_factory=list)
    custom_settings: Dict = field(default_factory=dict)

class CrawlerEngine:
    """
    爬虫引擎
    """
    
    def __init__(self, config: SpiderConfig):
        self.config = config
        self.scheduler = Scheduler()
        self.downloader = Downloader()
        self.parser_manager = ParserManager()
        self.pipeline_manager = PipelineManager()
        self.session_manager = SessionManager()
        self.anti_detection = AntiDetectionManager()
        self.rate_limiter = RateLimiter(
            max_requests=config.max_concurrent_requests,
            time_window=60
        )
        
        self.running = False
        self.stats = {
            'start_time': None,
            'end_time': None,
            'requests_total': 0,
            'requests_success': 0,
            'requests_failed': 0,
            'items_scraped': 0,
            'items_dropped': 0
        }
        
        self.logger = logging.getLogger(self.__class__.__name__)
        self._setup_signal_handlers()
    
    def _setup_signal_handlers(self):
        """
        设置信号处理器
        """
        def signal_handler(signum, frame):
            self.logger.info(f"接收到信号 {signum},正在停止爬虫...")
            asyncio.create_task(self.stop())
        
        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)
    
    async def start(self):
        """
        启动爬虫
        """
        self.logger.info(f"启动爬虫: {self.config.name}")
        self.running = True
        self.stats['start_time'] = datetime.now()
        
        try:
            # 初始化组件
            await self._initialize_components()
            
            # 添加起始URL
            for url in self.config.start_urls:
                request = Request(
                    url=url,
                    priority=Priority.HIGH,
                    meta={'depth': 0}
                )
                await self.scheduler.add_request(request)
            
            # 启动爬取循环
            await self._crawl_loop()
            
        except Exception as e:
            self.logger.error(f"爬虫运行错误: {str(e)}")
        finally:
            await self._cleanup()
    
    async def stop(self):
        """
        停止爬虫
        """
        self.logger.info("正在停止爬虫...")
        self.running = False
    
    async def _initialize_components(self):
        """
        初始化组件
        """
        # 初始化管道
        for pipeline_config in self.config.pipeline_config:
            pipeline_type = pipeline_config.get('type')
            pipeline_params = pipeline_config.get('params', {})
            
            if pipeline_type == 'validation':
                pipeline = ValidationPipeline(**pipeline_params)
            elif pipeline_type == 'cleaning':
                pipeline = CleaningPipeline(**pipeline_params)
            elif pipeline_type == 'duplicate_filter':
                pipeline = DuplicateFilterPipeline(**pipeline_params)
            elif pipeline_type == 'database':
                pipeline = DatabasePipeline(**pipeline_params)
            elif pipeline_type == 'file':
                pipeline = FilePipeline(**pipeline_params)
            else:
                continue
            
            self.pipeline_manager.add_pipeline(pipeline)
        
        # 初始化管道
        await self.pipeline_manager.open_spider(self.config.name)
        
        self.logger.info("组件初始化完成")
    
    async def _crawl_loop(self):
        """
        爬取循环
        """
        # 创建并发任务
        semaphore = asyncio.Semaphore(self.config.max_concurrent_requests)
        tasks = []
        
        while self.running:
            # 检查是否有待处理的请求
            if self.scheduler.empty() and not tasks:
                self.logger.info("没有更多请求,爬虫结束")
                break
            
            # 获取请求并创建任务
            while len(tasks) < self.config.max_concurrent_requests and not self.scheduler.empty():
                request = await self.scheduler.get_request()
                if request:
                    task = asyncio.create_task(
                        self._process_request(request, semaphore)
                    )
                    tasks.append(task)
            
            # 等待任务完成
            if tasks:
                done, pending = await asyncio.wait(
                    tasks, 
                    return_when=asyncio.FIRST_COMPLETED,
                    timeout=1.0
                )
                
                # 处理完成的任务
                for task in done:
                    tasks.remove(task)
                    try:
                        await task
                    except Exception as e:
                        self.logger.error(f"任务执行错误: {str(e)}")
            
            # 短暂休息
            await asyncio.sleep(0.1)
        
        # 等待所有任务完成
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _process_request(self, request: Request, semaphore: asyncio.Semaphore):
        """
        处理单个请求
        """
        async with semaphore:
            try:
                # 速率限制
                await self.rate_limiter.acquire()
                
                # 延迟
                delay = self.anti_detection.get_delay_time(
                    self.config.download_delay,
                    self.config.randomize_delay
                )
                await asyncio.sleep(delay)
                
                # 下载
                response = await self._download_request(request)
                if response:
                    # 解析
                    await self._parse_response(response, request)
                    
                    self.stats['requests_success'] += 1
                else:
                    self.stats['requests_failed'] += 1
                
                self.stats['requests_total'] += 1
                
            except Exception as e:
                self.logger.error(f"处理请求失败 {request.url}: {str(e)}")
                self.stats['requests_failed'] += 1
                
                # 重试逻辑
                if request.retry_count < self.config.max_retry_times:
                    request.retry_count += 1
                    await asyncio.sleep(self.config.retry_delay)
                    await self.scheduler.add_request(request)
    
    async def _download_request(self, request: Request) -> Optional[Response]:
        """
        下载请求
        """
        try:
            # 获取会话
            session = await self.session_manager.get_session('default')
            if not session:
                session = await self.session_manager.create_session(
                    'default',
                    headers=self.anti_detection.get_random_headers()
                )
            
            # 执行下载
            response = await self.downloader.download(request, session)
            
            # 检测反爬虫
            if response and hasattr(response, 'status'):
                # 这里需要将aiohttp响应转换为我们的Response对象
                # 简化处理
                return response
            
            return response
            
        except Exception as e:
            self.logger.error(f"下载失败 {request.url}: {str(e)}")
            return None
    
    async def _parse_response(self, response: Response, request: Request):
        """
        解析响应
        """
        try:
            # 解析数据项
            items = self.parser_manager.parse_response(
                response, 
                self.config.parser_config
            )
            
            # 处理数据项
            for item in items:
                processed_item = await self.pipeline_manager.process_item(
                    item, 
                    self.config.name
                )
                
                if processed_item:
                    self.stats['items_scraped'] += 1
                else:
                    self.stats['items_dropped'] += 1
            
            # 提取新链接
            if self.config.max_depth == 0 or request.meta.get('depth', 0) < self.config.max_depth:
                new_links = self._extract_links(response, request)
                for link in new_links:
                    new_request = Request(
                        url=link,
                        priority=Priority.NORMAL,
                        meta={'depth': request.meta.get('depth', 0) + 1}
                    )
                    await self.scheduler.add_request(new_request)
            
        except Exception as e:
            self.logger.error(f"解析响应失败 {response.url}: {str(e)}")
    
    def _extract_links(self, response: Response, request: Request) -> List[str]:
        """
        提取链接
        """
        try:
            # 使用HTML解析器提取链接
            html_parser = self.parser_manager.get_parser('html')
            if html_parser:
                links = html_parser.extract_links(response)
                
                # 过滤域名
                if self.config.allowed_domains:
                    filtered_links = []
                    for link in links:
                        domain = urlparse(link).netloc
                        if any(allowed in domain for allowed in self.config.allowed_domains):
                            filtered_links.append(link)
                    return filtered_links
                
                return links
            
        except Exception as e:
            self.logger.error(f"提取链接失败: {str(e)}")
        
        return []
    
    async def _cleanup(self):
        """
        清理资源
        """
        self.stats['end_time'] = datetime.now()
        
        # 关闭管道
        await self.pipeline_manager.close_spider(self.config.name)
        
        # 关闭会话
        await self.session_manager.close_all_sessions()
        
        # 打印统计信息
        self._print_stats()
        
        self.logger.info("爬虫已停止")
    
    def _print_stats(self):
        """
        打印统计信息
        """
        duration = self.stats['end_time'] - self.stats['start_time']
        
        print("\n" + "="*50)
        print(f"爬虫统计 - {self.config.name}")
        print("="*50)
        print(f"运行时间: {duration}")
        print(f"总请求数: {self.stats['requests_total']}")
        print(f"成功请求: {self.stats['requests_success']}")
        print(f"失败请求: {self.stats['requests_failed']}")
        print(f"成功率: {self.stats['requests_success']/max(1, self.stats['requests_total'])*100:.2f}%")
        print(f"爬取项目: {self.stats['items_scraped']}")
        print(f"丢弃项目: {self.stats['items_dropped']}")
        
        # 管道统计
        pipeline_stats = self.pipeline_manager.get_stats()
        if pipeline_stats:
            print("\n管道统计:")
            for pipeline_name, stats in pipeline_stats.items():
                print(f"  {pipeline_name}: {stats}")
        
        print("="*50)
    
    def get_stats(self) -> Dict:
        """
        获取统计信息
        """
        stats = self.stats.copy()
        stats['pipeline_stats'] = self.pipeline_manager.get_stats()
        stats['scheduler_stats'] = self.scheduler.get_stats()
        stats['rate_limiter_stats'] = self.rate_limiter.get_stats()
         return stats

6. 实战案例

6.1 电商网站爬虫

import asyncio
from typing import Dict, List
import json
from datetime import datetime

class EcommerceSpider:
    """
    电商网站爬虫示例
    """
    
    def __init__(self):
        # 创建项目结构
        self.project_structure = CrawlerProjectStructure()
        self.project_structure.create_project('ecommerce_crawler')
        
        # 配置爬虫
        self.config = SpiderConfig(
            name='ecommerce_spider',
            start_urls=[
                'https://example-shop.com/products',
                'https://example-shop.com/categories'
            ],
            allowed_domains=['example-shop.com'],
            max_concurrent_requests=10,
            download_delay=1.0,
            user_agent='EcommerceCrawler/1.0'
        )
        
        # 初始化组件
        self.scheduler = Scheduler()
        self.downloader = Downloader()
        self.parser_manager = ParserManager()
        self.pipeline_manager = PipelineManager()
        self.logger = CrawlerLogger()
        
        # 设置解析器
        self._setup_parsers()
        
        # 设置数据管道
        self._setup_pipelines()
    
    def _setup_parsers(self):
        """
        设置解析器
        """
        # 产品列表解析器
        product_list_parser = HTMLParser()
        product_list_parser.add_css_rule('product_links', 'a.product-link::attr(href)')
        product_list_parser.add_css_rule('next_page', 'a.next-page::attr(href)')
        
        # 产品详情解析器
        product_detail_parser = HTMLParser()
        product_detail_parser.add_css_rule('title', 'h1.product-title::text')
        product_detail_parser.add_css_rule('price', '.price::text')
        product_detail_parser.add_css_rule('description', '.description::text')
        product_detail_parser.add_css_rule('images', 'img.product-image::attr(src)')
        product_detail_parser.add_css_rule('rating', '.rating::attr(data-rating)')
        product_detail_parser.add_css_rule('reviews_count', '.reviews-count::text')
        product_detail_parser.add_css_rule('availability', '.availability::text')
        product_detail_parser.add_css_rule('category', '.breadcrumb a::text')
        
        # 注册解析器
        self.parser_manager.register_parser('product_list', product_list_parser)
        self.parser_manager.register_parser('product_detail', product_detail_parser)
    
    def _setup_pipelines(self):
        """
        设置数据管道
        """
        # 数据验证管道
        validation_rules = {
            'title': {'required': True, 'type': str},
            'price': {'required': True, 'type': str},
            'description': {'required': False, 'type': str},
            'rating': {'required': False, 'type': float, 'min': 0, 'max': 5}
        }
        validation_pipeline = ValidationPipeline(validation_rules)
        
        # 数据清洗管道
        cleaning_pipeline = CleaningPipeline()
        
        # 去重管道
        duplicate_filter = DuplicateFilterPipeline(['title', 'price'])
        
        # 数据库存储管道
        db_config = {
            'host': 'localhost',
            'port': 5432,
            'database': 'ecommerce',
            'user': 'crawler',
            'password': 'password'
        }
        db_pipeline = DatabasePipeline(db_config, 'products')
        
        # 文件存储管道
        file_pipeline = FilePipeline('data/products.json', 'json')
        
        # 注册管道
        self.pipeline_manager.add_pipeline(validation_pipeline)
        self.pipeline_manager.add_pipeline(cleaning_pipeline)
        self.pipeline_manager.add_pipeline(duplicate_filter)
        self.pipeline_manager.add_pipeline(db_pipeline)
        self.pipeline_manager.add_pipeline(file_pipeline)
    
    async def parse_product_list(self, response: Response) -> List[Request]:
        """
        解析产品列表页面
        """
        parser = self.parser_manager.get_parser('product_list')
        data = await parser.parse(response)
        
        requests = []
        
        # 提取产品链接
        if 'product_links' in data:
            for link in data['product_links']:
                if link:
                    full_url = response.urljoin(link)
                    request = Request(
                        url=full_url,
                        callback='parse_product_detail',
                        meta={'category': response.meta.get('category', 'unknown')}
                    )
                    requests.append(request)
        
        # 提取下一页链接
        if 'next_page' in data and data['next_page']:
            next_url = response.urljoin(data['next_page'][0])
            request = Request(
                url=next_url,
                callback='parse_product_list',
                meta=response.meta
            )
            requests.append(request)
        
        return requests
    
    async def parse_product_detail(self, response: Response) -> Item:
        """
        解析产品详情页面
        """
        parser = self.parser_manager.get_parser('product_detail')
        data = await parser.parse(response)
        
        # 创建产品项目
        item = Item()
        item['url'] = response.url
        item['title'] = self._clean_text(data.get('title', [''])[0])
        item['price'] = self._extract_price(data.get('price', [''])[0])
        item['description'] = self._clean_text(data.get('description', [''])[0])
        item['images'] = [response.urljoin(img) for img in data.get('images', [])]
        item['rating'] = self._extract_rating(data.get('rating', [''])[0])
        item['reviews_count'] = self._extract_number(data.get('reviews_count', [''])[0])
        item['availability'] = self._clean_text(data.get('availability', [''])[0])
        item['category'] = response.meta.get('category', 'unknown')
        item['scraped_at'] = datetime.now().isoformat()
        
        return item
    
    def _clean_text(self, text: str) -> str:
        """
        清理文本
        """
        if not text:
            return ''
        return text.strip().replace('\n', ' ').replace('\t', ' ')
    
    def _extract_price(self, price_text: str) -> float:
        """
        提取价格
        """
        import re
        if not price_text:
            return 0.0
        
        # 提取数字
        price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
        if price_match:
            return float(price_match.group())
        return 0.0
    
    def _extract_rating(self, rating_text: str) -> float:
        """
        提取评分
        """
        if not rating_text:
            return 0.0
        try:
            return float(rating_text)
        except ValueError:
            return 0.0
    
    def _extract_number(self, text: str) -> int:
        """
        提取数字
        """
        import re
        if not text:
            return 0
        
        number_match = re.search(r'\d+', text.replace(',', ''))
        if number_match:
            return int(number_match.group())
        return 0
    
    async def run(self):
        """
        运行爬虫
        """
        # 创建爬虫引擎
        engine = CrawlerEngine(self.config)
        
        # 注册回调函数
        engine.register_callback('parse_product_list', self.parse_product_list)
        engine.register_callback('parse_product_detail', self.parse_product_detail)
        
        # 设置组件
        engine.scheduler = self.scheduler
        engine.downloader = self.downloader
        engine.parser_manager = self.parser_manager
        engine.pipeline_manager = self.pipeline_manager
        
        try:
            # 启动爬虫
            await engine.start()
            
            # 等待完成
            await engine.wait_for_completion()
            
        except Exception as e:
            self.logger.log_error(e, {'spider': 'ecommerce_spider'})
        
        finally:
            # 停止爬虫
            await engine.stop()
            
            # 生成报告
            stats = engine.get_stats()
            print(f"爬取完成: {json.dumps(stats, indent=2)}")

# 使用示例
async def main():
    spider = EcommerceSpider()
    await spider.run()

if __name__ == '__main__':
    asyncio.run(main())

6.2 新闻网站爬虫

import asyncio
from typing import Dict, List
import json
from datetime import datetime, timedelta
import hashlib

class NewsSpider:
    """
    新闻网站爬虫示例
    """
    
    def __init__(self):
        # 配置爬虫
        self.config = SpiderConfig(
            name='news_spider',
            start_urls=[
                'https://example-news.com/latest',
                'https://example-news.com/technology',
                'https://example-news.com/business'
            ],
            allowed_domains=['example-news.com'],
            max_concurrent_requests=5,
            download_delay=2.0,
            user_agent='NewsBot/1.0'
        )
        
        # 初始化组件
        self.scheduler = Scheduler()
        self.downloader = Downloader()
        self.parser_manager = ParserManager()
        self.pipeline_manager = PipelineManager()
        self.logger = CrawlerLogger()
        
        # 设置解析器和管道
        self._setup_parsers()
        self._setup_pipelines()
    
    def _setup_parsers(self):
        """
        设置解析器
        """
        # 新闻列表解析器
        news_list_parser = HTMLParser()
        news_list_parser.add_css_rule('article_links', 'a.article-link::attr(href)')
        news_list_parser.add_css_rule('article_titles', 'h2.article-title::text')
        news_list_parser.add_css_rule('article_summaries', '.article-summary::text')
        news_list_parser.add_css_rule('publish_dates', '.publish-date::text')
        
        # 新闻详情解析器
        news_detail_parser = HTMLParser()
        news_detail_parser.add_css_rule('title', 'h1.article-title::text')
        news_detail_parser.add_css_rule('content', '.article-content::text')
        news_detail_parser.add_css_rule('author', '.author::text')
        news_detail_parser.add_css_rule('publish_date', '.publish-date::text')
        news_detail_parser.add_css_rule('tags', '.tag::text')
        news_detail_parser.add_css_rule('images', 'img.article-image::attr(src)')
        news_detail_parser.add_css_rule('category', '.category::text')
        
        # 注册解析器
        self.parser_manager.register_parser('news_list', news_list_parser)
        self.parser_manager.register_parser('news_detail', news_detail_parser)
    
    def _setup_pipelines(self):
        """
        设置数据管道
        """
        # 数据验证管道
        validation_rules = {
            'title': {'required': True, 'type': str, 'min_length': 5},
            'content': {'required': True, 'type': str, 'min_length': 100},
            'author': {'required': False, 'type': str},
            'publish_date': {'required': True, 'type': str}
        }
        validation_pipeline = ValidationPipeline(validation_rules)
        
        # 数据清洗管道
        cleaning_pipeline = CleaningPipeline()
        
        # 去重管道(基于标题和内容哈希)
        duplicate_filter = DuplicateFilterPipeline(['title_hash', 'content_hash'])
        
        # MongoDB存储管道
        mongo_config = {
            'host': 'localhost',
            'port': 27017,
            'database': 'news_db',
            'collection': 'articles'
        }
        mongo_pipeline = MongoPipeline(mongo_config)
        
        # 文件存储管道
        file_pipeline = FilePipeline('data/news.jsonl', 'jsonl')
        
        # 注册管道
        self.pipeline_manager.add_pipeline(validation_pipeline)
        self.pipeline_manager.add_pipeline(cleaning_pipeline)
        self.pipeline_manager.add_pipeline(duplicate_filter)
        self.pipeline_manager.add_pipeline(mongo_pipeline)
        self.pipeline_manager.add_pipeline(file_pipeline)
    
    async def parse_news_list(self, response: Response) -> List[Request]:
        """
        解析新闻列表页面
        """
        parser = self.parser_manager.get_parser('news_list')
        data = await parser.parse(response)
        
        requests = []
        
        # 提取文章链接
        if 'article_links' in data:
            for i, link in enumerate(data['article_links']):
                if link:
                    full_url = response.urljoin(link)
                    
                    # 获取对应的标题和摘要
                    title = data.get('article_titles', [])[i] if i < len(data.get('article_titles', [])) else ''
                    summary = data.get('article_summaries', [])[i] if i < len(data.get('article_summaries', [])) else ''
                    publish_date = data.get('publish_dates', [])[i] if i < len(data.get('publish_dates', [])) else ''
                    
                    # 检查发布日期(只爬取最近7天的新闻)
                    if self._is_recent_article(publish_date):
                        request = Request(
                            url=full_url,
                            callback='parse_news_detail',
                            meta={
                                'list_title': title,
                                'list_summary': summary,
                                'list_publish_date': publish_date,
                                'category': response.meta.get('category', 'general')
                            }
                        )
                        requests.append(request)
        
        return requests
    
    async def parse_news_detail(self, response: Response) -> Item:
        """
        解析新闻详情页面
        """
        parser = self.parser_manager.get_parser('news_detail')
        data = await parser.parse(response)
        
        # 创建新闻项目
        item = Item()
        item['url'] = response.url
        item['title'] = self._clean_text(data.get('title', [''])[0])
        item['content'] = self._clean_content(data.get('content', []))
        item['author'] = self._clean_text(data.get('author', [''])[0])
        item['publish_date'] = self._parse_date(data.get('publish_date', [''])[0])
        item['tags'] = [self._clean_text(tag) for tag in data.get('tags', [])]
        item['images'] = [response.urljoin(img) for img in data.get('images', [])]
        item['category'] = response.meta.get('category', 'general')
        item['scraped_at'] = datetime.now().isoformat()
        
        # 添加哈希值用于去重
        item['title_hash'] = hashlib.md5(item['title'].encode()).hexdigest()
        item['content_hash'] = hashlib.md5(item['content'].encode()).hexdigest()
        
        # 添加文本统计
        item['word_count'] = len(item['content'].split())
        item['char_count'] = len(item['content'])
        
        return item
    
    def _clean_text(self, text: str) -> str:
        """
        清理文本
        """
        if not text:
            return ''
        return text.strip().replace('\n', ' ').replace('\t', ' ')
    
    def _clean_content(self, content_list: List[str]) -> str:
        """
        清理内容
        """
        if not content_list:
            return ''
        
        # 合并所有段落
        content = ' '.join(content_list)
        
        # 清理多余的空白字符
        import re
        content = re.sub(r'\s+', ' ', content)
        
        return content.strip()
    
    def _parse_date(self, date_str: str) -> str:
        """
        解析日期
        """
        if not date_str:
            return ''
        
        # 这里可以添加更复杂的日期解析逻辑
        # 简化版本,直接返回原始字符串
        return date_str.strip()
    
    def _is_recent_article(self, publish_date: str) -> bool:
        """
        检查是否为最近的文章
        """
        if not publish_date:
            return True  # 如果没有日期信息,默认爬取
        
        try:
            # 这里需要根据实际的日期格式进行解析
            # 简化版本,总是返回True
            return True
        except:
            return True
    
    async def run(self):
        """
        运行爬虫
        """
        # 创建爬虫引擎
        engine = CrawlerEngine(self.config)
        
        # 注册回调函数
        engine.register_callback('parse_news_list', self.parse_news_list)
        engine.register_callback('parse_news_detail', self.parse_news_detail)
        
        # 设置组件
        engine.scheduler = self.scheduler
        engine.downloader = self.downloader
        engine.parser_manager = self.parser_manager
        engine.pipeline_manager = self.pipeline_manager
        
        try:
            # 启动爬虫
            await engine.start()
            
            # 等待完成
            await engine.wait_for_completion()
            
        except Exception as e:
            self.logger.log_error(e, {'spider': 'news_spider'})
        
        finally:
            # 停止爬虫
            await engine.stop()
            
            # 生成报告
            stats = engine.get_stats()
            print(f"新闻爬取完成: {json.dumps(stats, indent=2)}")

# 使用示例
async def main():
    spider = NewsSpider()
    await spider.run()

if __name__ == '__main__':
    asyncio.run(main())

6.3 分布式爬虫部署

import asyncio
from typing import Dict, List
import json

class DistributedCrawlerDeployment:
    """
    分布式爬虫部署示例
    """
    
    def __init__(self, redis_url: str = 'redis://localhost:6379/0'):
        # 创建Redis队列
        self.redis_queue = RedisQueue(redis_url, 'distributed_crawler')
        
        # 创建分布式主节点
        self.master = DistributedMaster(self.redis_queue)
        
        # 爬虫配置
        self.spider_configs = {
            'ecommerce': SpiderConfig(
                name='ecommerce_spider',
                start_urls=['https://example-shop.com/products'],
                allowed_domains=['example-shop.com'],
                max_concurrent_requests=5,
                download_delay=1.0
            ),
            'news': SpiderConfig(
                name='news_spider',
                start_urls=['https://example-news.com/latest'],
                allowed_domains=['example-news.com'],
                max_concurrent_requests=3,
                download_delay=2.0
            )
        }
        
        # 监控器
        self.monitor = CrawlerMonitor()
    
    async def deploy_cluster(self, num_workers: int = 3):
        """
        部署爬虫集群
        """
        print(f"部署 {num_workers} 个工作节点...")
        
        # 启动工作节点
        for i in range(num_workers):
            worker_id = f"worker_{i+1}"
            await self.master.start_worker(worker_id, self.spider_configs)
            print(f"工作节点 {worker_id} 已启动")
        
        # 启动监控
        self.monitor.start_monitoring()
        
        print("集群部署完成")
    
    async def submit_crawl_tasks(self):
        """
        提交爬取任务
        """
        # 电商网站URL
        ecommerce_urls = [
            'https://example-shop.com/products?page=1',
            'https://example-shop.com/products?page=2',
            'https://example-shop.com/products?page=3',
            'https://example-shop.com/categories/electronics',
            'https://example-shop.com/categories/clothing'
        ]
        
        # 新闻网站URL
        news_urls = [
            'https://example-news.com/latest',
            'https://example-news.com/technology',
            'https://example-news.com/business',
            'https://example-news.com/sports',
            'https://example-news.com/entertainment'
        ]
        
        # 提交电商任务(高优先级)
        ecommerce_task_ids = await self.master.submit_urls(
            'ecommerce', ecommerce_urls, priority=10
        )
        print(f"已提交 {len(ecommerce_task_ids)} 个电商任务")
        
        # 提交新闻任务(普通优先级)
        news_task_ids = await self.master.submit_urls(
            'news', news_urls, priority=5
        )
        print(f"已提交 {len(news_task_ids)} 个新闻任务")
        
        return ecommerce_task_ids + news_task_ids
    
    async def monitor_cluster(self, duration: int = 300):
        """
        监控集群运行状态
        """
        print(f"开始监控集群,持续 {duration} 秒...")
        
        start_time = asyncio.get_event_loop().time()
        
        while (asyncio.get_event_loop().time() - start_time) < duration:
            # 获取集群统计
            stats = self.master.get_cluster_stats()
            
            print("\n=== 集群状态 ===")
            print(f"队列统计: {stats['queue_stats']}")
            print(f"工作节点数: {stats['total_workers']}")
            
            for worker_id, worker_stat in stats['worker_stats'].items():
                print(f"工作节点 {worker_id}: 已处理 {worker_stat['tasks_processed']} 个任务")
            
            # 生成监控报告
            report = self.monitor.generate_report()
            print(f"系统摘要: {report['summary']}")
            
            await asyncio.sleep(30)  # 每30秒检查一次
    
    async def shutdown_cluster(self):
        """
        关闭集群
        """
        print("正在关闭集群...")
        
        # 停止监控
        self.monitor.stop_monitoring()
        
        # 停止集群
        await self.master.stop_cluster()
        
        # 生成最终报告
        final_stats = self.redis_queue.get_queue_stats()
        print(f"最终统计: {json.dumps(final_stats, indent=2)}")
        
        print("集群已关闭")
    
    async def run_distributed_crawl(self, duration: int = 300):
        """
        运行分布式爬取
        """
        try:
            # 部署集群
            await self.deploy_cluster(num_workers=3)
            
            # 提交任务
            task_ids = await self.submit_crawl_tasks()
            print(f"总共提交了 {len(task_ids)} 个任务")
            
            # 监控集群
            await self.monitor_cluster(duration)
            
        except Exception as e:
            print(f"分布式爬取错误: {str(e)}")
        
        finally:
            # 关闭集群
            await self.shutdown_cluster()

# 使用示例
async def main():
    deployment = DistributedCrawlerDeployment()
    await deployment.run_distributed_crawl(duration=600)  # 运行10分钟

if __name__ == '__main__':
    asyncio.run(main())

7. 总结

7.1 技术要点

7.1.1 系统架构设计
  • 模块化设计: 将爬虫系统分解为独立的组件(调度器、下载器、解析器、管道等)
  • 异步编程: 使用asyncio实现高并发的网络请求处理
  • 可扩展性: 支持插件式的组件扩展和自定义
  • 容错机制: 实现重试、错误处理和故障恢复
7.1.2 核心组件
  • 调度器: 管理请求队列、优先级和去重
  • 下载器: 处理HTTP请求、代理管理和用户代理轮换
  • 解析器: 支持多种解析方式(CSS选择器、XPath、正则表达式)
  • 数据管道: 实现数据验证、清洗、去重和存储
  • 反爬虫处理: 包括验证码识别、浏览器模拟和行为随机化
7.1.3 分布式架构
  • Redis队列: 实现分布式任务调度和状态管理
  • 工作节点: 支持多进程/多机器的水平扩展
  • 负载均衡: 智能任务分配和资源利用
  • 监控管理: 实时监控集群状态和性能指标

7.2 最佳实践

7.2.1 性能优化
  • 并发控制: 合理设置并发数量,避免对目标网站造成过大压力
  • 请求间隔: 设置适当的下载延迟,模拟人类访问行为
  • 连接池: 复用HTTP连接,减少连接建立开销
  • 缓存机制: 缓存重复请求和解析结果
7.2.2 反爬虫对策
  • 用户代理轮换: 使用多样化的User-Agent字符串
  • 代理IP: 使用代理池避免IP被封禁
  • 请求头随机化: 模拟真实浏览器的请求头
  • 行为模拟: 随机化点击、滚动等用户行为
7.2.3 数据质量
  • 数据验证: 实施严格的数据格式和内容验证
  • 去重机制: 多层次的重复数据检测和过滤
  • 数据清洗: 标准化数据格式和内容
  • 错误处理: 完善的异常捕获和错误恢复
7.2.4 监控运维
  • 日志管理: 详细的日志记录和分析
  • 性能监控: 实时监控系统资源和爬取效率
  • 告警机制: 及时发现和处理异常情况
  • 数据备份: 定期备份重要数据和配置

7.3 学习成果

通过本章学习,你应该掌握:

7.3.1 核心技能
  • 系统设计: 能够设计和实现大规模爬虫系统
  • 异步编程: 熟练使用asyncio进行并发编程
  • 网络协议: 深入理解HTTP协议和网络通信
  • 数据处理: 掌握数据提取、清洗和存储技术
  • 分布式系统: 理解分布式架构和实现方式
7.3.2 工程能力
  • 代码组织: 良好的代码结构和模块化设计
  • 错误处理: 完善的异常处理和容错机制
  • 性能优化: 系统性能分析和优化技能
  • 运维管理: 系统监控、日志管理和故障排查
7.3.3 业务理解
  • 合规意识: 了解爬虫的法律和道德边界
  • 反爬虫技术: 理解网站反爬虫机制和应对策略
  • 数据价值: 认识数据的商业价值和应用场景

7.4 扩展方向

7.4.1 技术深化
  • 机器学习: 集成ML模型进行智能内容识别和分类
  • 自然语言处理: 实现文本分析和信息抽取
  • 计算机视觉: 处理图像和视频内容
  • 区块链: 探索去中心化的数据采集方案
7.4.2 平台扩展
  • 移动端爬虫: 支持移动应用和小程序数据采集
  • API爬虫: 集成各种API接口进行数据获取
  • 实时爬虫: 实现实时数据流处理
  • 云原生: 基于Kubernetes的容器化部署
7.4.3 应用场景
  • 商业智能: 竞品分析和市场监控
  • 舆情监控: 社交媒体和新闻监控
  • 价格监控: 电商价格跟踪和分析
  • 学术研究: 大规模数据收集和分析

7.5 下一步学习建议

  1. 深入学习网络协议: 掌握HTTP/2、WebSocket等新协议
  2. 学习容器技术: 使用Docker和Kubernetes进行部署
  3. 掌握云服务: 利用AWS、Azure等云平台资源
  4. 学习数据科学: 将爬取的数据用于分析和建模
  5. 关注法律法规: 了解数据保护和隐私相关法律

通过系统学习网络爬虫技术,你将能够构建高效、稳定、可扩展的数据采集系统,为数据驱动的业务决策提供强有力的技术支持。

💡 提示: 在实际应用中,请务必遵守网站的robots.txt协议和相关法律法规,尊重数据版权和用户隐私。

🔒 安全: 爬虫系统应该实施适当的安全措施,包括数据加密、访问控制和审计日志。

性能: 持续监控和优化系统性能,确保在大规模数据采集时的稳定性和效率。

4.2 分布式爬虫

import redis
import json
import pickle
from typing import Dict, List, Optional, Any
import asyncio
import logging
from dataclasses import dataclass, asdict
from datetime import datetime
import uuid
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp

@dataclass
class DistributedTask:
    """
    分布式任务
    """
    task_id: str
    spider_name: str
    request_data: Dict
    priority: int = 0
    created_at: datetime = None
    assigned_to: str = None
    status: str = 'pending'  # pending, running, completed, failed
    retry_count: int = 0
    max_retries: int = 3
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()
        if self.task_id is None:
            self.task_id = str(uuid.uuid4())

class RedisQueue:
    """
    Redis队列管理器
    """
    
    def __init__(self, redis_url: str = 'redis://localhost:6379/0',
                 queue_name: str = 'crawler_queue'):
        self.redis_client = redis.from_url(redis_url)
        self.queue_name = queue_name
        self.processing_queue = f"{queue_name}:processing"
        self.completed_queue = f"{queue_name}:completed"
        self.failed_queue = f"{queue_name}:failed"
        self.stats_key = f"{queue_name}:stats"
        self.logger = logging.getLogger(self.__class__.__name__)
    
    async def push_task(self, task: DistributedTask) -> bool:
        """
        推送任务到队列
        """
        try:
            task_data = json.dumps(asdict(task), default=str)
            # 使用优先级队列
            score = -task.priority  # 负数使高优先级排在前面
            result = self.redis_client.zadd(self.queue_name, {task_data: score})
            
            # 更新统计
            self.redis_client.hincrby(self.stats_key, 'total_tasks', 1)
            self.redis_client.hincrby(self.stats_key, 'pending_tasks', 1)
            
            self.logger.info(f"任务已推送: {task.task_id}")
            return bool(result)
            
        except Exception as e:
            self.logger.error(f"推送任务失败: {str(e)}")
            return False
    
    async def pop_task(self, worker_id: str) -> Optional[DistributedTask]:
        """
        从队列中获取任务
        """
        try:
            # 使用BZPOPMIN获取最高优先级任务
            result = self.redis_client.bzpopmin(self.queue_name, timeout=5)
            
            if result:
                queue_name, task_data, score = result
                task_dict = json.loads(task_data)
                
                # 重建datetime对象
                if 'created_at' in task_dict and task_dict['created_at']:
                    task_dict['created_at'] = datetime.fromisoformat(task_dict['created_at'])
                
                task = DistributedTask(**task_dict)
                task.assigned_to = worker_id
                task.status = 'running'
                
                # 移动到处理队列
                processing_data = json.dumps(asdict(task), default=str)
                self.redis_client.hset(self.processing_queue, task.task_id, processing_data)
                
                # 更新统计
                self.redis_client.hincrby(self.stats_key, 'pending_tasks', -1)
                self.redis_client.hincrby(self.stats_key, 'running_tasks', 1)
                
                return task
            
            return None
            
        except Exception as e:
            self.logger.error(f"获取任务失败: {str(e)}")
            return None
    
    async def complete_task(self, task_id: str, result_data: Dict = None) -> bool:
        """
        标记任务完成
        """
        try:
            # 从处理队列移除
            task_data = self.redis_client.hget(self.processing_queue, task_id)
            if task_data:
                self.redis_client.hdel(self.processing_queue, task_id)
                
                # 添加到完成队列
                task_dict = json.loads(task_data)
                task_dict['status'] = 'completed'
                task_dict['completed_at'] = datetime.now().isoformat()
                
                if result_data:
                    task_dict['result'] = result_data
                
                completed_data = json.dumps(task_dict)
                self.redis_client.hset(self.completed_queue, task_id, completed_data)
                
                # 更新统计
                self.redis_client.hincrby(self.stats_key, 'running_tasks', -1)
                self.redis_client.hincrby(self.stats_key, 'completed_tasks', 1)
                
                self.logger.info(f"任务完成: {task_id}")
                return True
            
            return False
            
        except Exception as e:
            self.logger.error(f"完成任务失败: {str(e)}")
            return False
    
    async def fail_task(self, task_id: str, error_msg: str = None) -> bool:
        """
        标记任务失败
        """
        try:
            # 从处理队列获取任务
            task_data = self.redis_client.hget(self.processing_queue, task_id)
            if task_data:
                task_dict = json.loads(task_data)
                task = DistributedTask(**task_dict)
                
                # 检查是否需要重试
                if task.retry_count < task.max_retries:
                    task.retry_count += 1
                    task.status = 'pending'
                    task.assigned_to = None
                    
                    # 重新加入队列
                    retry_data = json.dumps(asdict(task), default=str)
                    score = -task.priority
                    self.redis_client.zadd(self.queue_name, {retry_data: score})
                    
                    self.logger.info(f"任务重试: {task_id} (第{task.retry_count}次)")
                else:
                    # 移动到失败队列
                    task.status = 'failed'
                    if error_msg:
                        task_dict['error'] = error_msg
                    
                    failed_data = json.dumps(task_dict)
                    self.redis_client.hset(self.failed_queue, task_id, failed_data)
                    
                    # 更新统计
                    self.redis_client.hincrby(self.stats_key, 'failed_tasks', 1)
                    
                    self.logger.error(f"任务失败: {task_id}")
                
                # 从处理队列移除
                self.redis_client.hdel(self.processing_queue, task_id)
                self.redis_client.hincrby(self.stats_key, 'running_tasks', -1)
                
                return True
            
            return False
            
        except Exception as e:
            self.logger.error(f"处理失败任务错误: {str(e)}")
            return False
    
    def get_queue_stats(self) -> Dict:
        """
        获取队列统计信息
        """
        try:
            stats = self.redis_client.hgetall(self.stats_key)
            return {k.decode(): int(v.decode()) for k, v in stats.items()}
        except Exception as e:
            self.logger.error(f"获取统计信息失败: {str(e)}")
            return {}
    
    def clear_queue(self, queue_type: str = 'all'):
        """
        清空队列
        """
        try:
            if queue_type in ['all', 'pending']:
                self.redis_client.delete(self.queue_name)
            if queue_type in ['all', 'processing']:
                self.redis_client.delete(self.processing_queue)
            if queue_type in ['all', 'completed']:
                self.redis_client.delete(self.completed_queue)
            if queue_type in ['all', 'failed']:
                self.redis_client.delete(self.failed_queue)
            if queue_type == 'all':
                self.redis_client.delete(self.stats_key)
            
            self.logger.info(f"队列已清空: {queue_type}")
            
        except Exception as e:
            self.logger.error(f"清空队列失败: {str(e)}")

class DistributedWorker:
    """
    分布式工作节点
    """
    
    def __init__(self, worker_id: str, redis_queue: RedisQueue,
                 spider_configs: Dict[str, SpiderConfig]):
        self.worker_id = worker_id
        self.redis_queue = redis_queue
        self.spider_configs = spider_configs
        self.running = False
        self.current_task = None
        self.stats = {
            'tasks_processed': 0,
            'tasks_completed': 0,
            'tasks_failed': 0,
            'start_time': None
        }
        self.logger = logging.getLogger(f"{self.__class__.__name__}-{worker_id}")
    
    async def start(self):
        """
        启动工作节点
        """
        self.logger.info(f"启动工作节点: {self.worker_id}")
        self.running = True
        self.stats['start_time'] = datetime.now()
        
        try:
            while self.running:
                # 获取任务
                task = await self.redis_queue.pop_task(self.worker_id)
                
                if task:
                    await self._process_task(task)
                else:
                    # 没有任务时短暂休息
                    await asyncio.sleep(1)
                    
        except Exception as e:
            self.logger.error(f"工作节点运行错误: {str(e)}")
        finally:
            self.logger.info(f"工作节点停止: {self.worker_id}")
    
    async def stop(self):
        """
        停止工作节点
        """
        self.running = False
        
        # 如果有正在处理的任务,标记为失败
        if self.current_task:
            await self.redis_queue.fail_task(
                self.current_task.task_id,
                "Worker stopped"
            )
    
    async def _process_task(self, task: DistributedTask):
        """
        处理任务
        """
        self.current_task = task
        self.stats['tasks_processed'] += 1
        
        try:
            self.logger.info(f"处理任务: {task.task_id}")
            
            # 获取爬虫配置
            spider_config = self.spider_configs.get(task.spider_name)
            if not spider_config:
                raise ValueError(f"未找到爬虫配置: {task.spider_name}")
            
            # 创建爬虫引擎
            engine = CrawlerEngine(spider_config)
            
            # 处理请求数据
            request_data = task.request_data
            request = Request(
                url=request_data['url'],
                method=request_data.get('method', 'GET'),
                headers=request_data.get('headers', {}),
                body=request_data.get('body'),
                meta=request_data.get('meta', {})
            )
            
            # 执行爬取(简化版本)
            result = await self._execute_crawl(engine, request)
            
            # 标记任务完成
            await self.redis_queue.complete_task(task.task_id, result)
            self.stats['tasks_completed'] += 1
            
        except Exception as e:
            self.logger.error(f"任务处理失败 {task.task_id}: {str(e)}")
            await self.redis_queue.fail_task(task.task_id, str(e))
            self.stats['tasks_failed'] += 1
        
        finally:
            self.current_task = None
    
    async def _execute_crawl(self, engine: CrawlerEngine, request: Request) -> Dict:
        """
        执行爬取
        """
        try:
            # 初始化引擎组件
            await engine._initialize_components()
            
            # 下载请求
            response = await engine._download_request(request)
            
            if response:
                # 解析响应
                await engine._parse_response(response, request)
                
                return {
                    'status': 'success',
                    'url': request.url,
                    'items_count': engine.stats.get('items_scraped', 0)
                }
            else:
                return {
                    'status': 'failed',
                    'url': request.url,
                    'error': 'Download failed'
                }
                
        except Exception as e:
            return {
                'status': 'error',
                'url': request.url,
                'error': str(e)
            }
        finally:
            # 清理资源
            await engine._cleanup()
    
    def get_stats(self) -> Dict:
        """
        获取工作节点统计信息
        """
        stats = self.stats.copy()
        if self.stats['start_time']:
            stats['uptime'] = datetime.now() - self.stats['start_time']
        stats['current_task'] = self.current_task.task_id if self.current_task else None
        return stats

class DistributedMaster:
    """
    分布式主节点
    """
    
    def __init__(self, redis_queue: RedisQueue):
        self.redis_queue = redis_queue
        self.workers = {}
        self.running = False
        self.logger = logging.getLogger(self.__class__.__name__)
    
    async def start_worker(self, worker_id: str, spider_configs: Dict[str, SpiderConfig]):
        """
        启动工作节点
        """
        if worker_id not in self.workers:
            worker = DistributedWorker(worker_id, self.redis_queue, spider_configs)
            self.workers[worker_id] = worker
            
            # 在新进程中启动工作节点
            task = asyncio.create_task(worker.start())
            self.workers[worker_id] = {'worker': worker, 'task': task}
            
            self.logger.info(f"工作节点已启动: {worker_id}")
        else:
            self.logger.warning(f"工作节点已存在: {worker_id}")
    
    async def stop_worker(self, worker_id: str):
        """
        停止工作节点
        """
        if worker_id in self.workers:
            worker_info = self.workers[worker_id]
            await worker_info['worker'].stop()
            worker_info['task'].cancel()
            
            del self.workers[worker_id]
            self.logger.info(f"工作节点已停止: {worker_id}")
        else:
            self.logger.warning(f"工作节点不存在: {worker_id}")
    
    async def submit_urls(self, spider_name: str, urls: List[str], 
                         priority: int = 0) -> List[str]:
        """
        提交URL任务
        """
        task_ids = []
        
        for url in urls:
            task = DistributedTask(
                task_id=str(uuid.uuid4()),
                spider_name=spider_name,
                request_data={'url': url},
                priority=priority
            )
            
            success = await self.redis_queue.push_task(task)
            if success:
                task_ids.append(task.task_id)
        
        self.logger.info(f"已提交 {len(task_ids)} 个任务")
        return task_ids
    
    def get_cluster_stats(self) -> Dict:
        """
        获取集群统计信息
        """
        queue_stats = self.redis_queue.get_queue_stats()
        
        worker_stats = {}
        for worker_id, worker_info in self.workers.items():
            worker_stats[worker_id] = worker_info['worker'].get_stats()
        
        return {
            'queue_stats': queue_stats,
            'worker_stats': worker_stats,
            'total_workers': len(self.workers)
        }
    
    async def monitor_cluster(self, interval: int = 30):
        """
        监控集群状态
        """
        self.running = True
        
        while self.running:
            try:
                stats = self.get_cluster_stats()
                
                self.logger.info("=== 集群状态 ===")
                self.logger.info(f"队列统计: {stats['queue_stats']}")
                self.logger.info(f"工作节点数: {stats['total_workers']}")
                
                for worker_id, worker_stat in stats['worker_stats'].items():
                    self.logger.info(f"工作节点 {worker_id}: {worker_stat}")
                
                await asyncio.sleep(interval)
                
            except Exception as e:
                self.logger.error(f"监控集群错误: {str(e)}")
                await asyncio.sleep(interval)
    
    async def stop_cluster(self):
        """
        停止整个集群
        """
        self.running = False
        
        # 停止所有工作节点
        for worker_id in list(self.workers.keys()):
            await self.stop_worker(worker_id)
        
        self.logger.info("集群已停止")

5. 监控与管理

5.1 性能监控

import psutil
import time
from typing import Dict, List, Optional
import logging
from datetime import datetime, timedelta
import threading
from dataclasses import dataclass, field
import json
from collections import deque
import matplotlib.pyplot as plt
import pandas as pd

@dataclass
class PerformanceMetrics:
    """
    性能指标
    """
    timestamp: datetime
    cpu_percent: float
    memory_percent: float
    memory_used_mb: float
    disk_io_read_mb: float
    disk_io_write_mb: float
    network_sent_mb: float
    network_recv_mb: float
    requests_per_second: float = 0.0
    response_time_avg: float = 0.0
    error_rate: float = 0.0
    active_connections: int = 0

class SystemMonitor:
    """
    系统监控器
    """
    
    def __init__(self, max_history: int = 1000):
        self.max_history = max_history
        self.metrics_history = deque(maxlen=max_history)
        self.monitoring = False
        self.monitor_thread = None
        self.logger = logging.getLogger(self.__class__.__name__)
        
        # 初始化基准值
        self._last_disk_io = psutil.disk_io_counters()
        self._last_network_io = psutil.net_io_counters()
        self._last_check_time = time.time()
    
    def start_monitoring(self, interval: float = 5.0):
        """
        开始监控
        """
        if not self.monitoring:
            self.monitoring = True
            self.monitor_thread = threading.Thread(
                target=self._monitor_loop,
                args=(interval,),
                daemon=True
            )
            self.monitor_thread.start()
            self.logger.info("系统监控已启动")
    
    def stop_monitoring(self):
        """
        停止监控
        """
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join(timeout=5)
        self.logger.info("系统监控已停止")
    
    def _monitor_loop(self, interval: float):
        """
        监控循环
        """
        while self.monitoring:
            try:
                metrics = self._collect_metrics()
                self.metrics_history.append(metrics)
                time.sleep(interval)
            except Exception as e:
                self.logger.error(f"监控错误: {str(e)}")
                time.sleep(interval)
    
    def _collect_metrics(self) -> PerformanceMetrics:
        """
        收集性能指标
        """
        current_time = time.time()
        time_delta = current_time - self._last_check_time
        
        # CPU和内存
        cpu_percent = psutil.cpu_percent()
        memory = psutil.virtual_memory()
        
        # 磁盘IO
        current_disk_io = psutil.disk_io_counters()
        disk_read_mb = 0
        disk_write_mb = 0
        
        if self._last_disk_io and time_delta > 0:
            disk_read_mb = (current_disk_io.read_bytes - self._last_disk_io.read_bytes) / (1024 * 1024) / time_delta
            disk_write_mb = (current_disk_io.write_bytes - self._last_disk_io.write_bytes) / (1024 * 1024) / time_delta
        
        # 网络IO
        current_network_io = psutil.net_io_counters()
        network_sent_mb = 0
        network_recv_mb = 0
        
        if self._last_network_io and time_delta > 0:
            network_sent_mb = (current_network_io.bytes_sent - self._last_network_io.bytes_sent) / (1024 * 1024) / time_delta
            network_recv_mb = (current_network_io.bytes_recv - self._last_network_io.bytes_recv) / (1024 * 1024) / time_delta
        
        # 更新基准值
        self._last_disk_io = current_disk_io
        self._last_network_io = current_network_io
        self._last_check_time = current_time
        
        return PerformanceMetrics(
            timestamp=datetime.now(),
            cpu_percent=cpu_percent,
            memory_percent=memory.percent,
            memory_used_mb=memory.used / (1024 * 1024),
            disk_io_read_mb=disk_read_mb,
            disk_io_write_mb=disk_write_mb,
            network_sent_mb=network_sent_mb,
            network_recv_mb=network_recv_mb
        )
    
    def get_current_metrics(self) -> Optional[PerformanceMetrics]:
        """
        获取当前指标
        """
        if self.metrics_history:
            return self.metrics_history[-1]
        return None
    
    def get_metrics_history(self, minutes: int = 60) -> List[PerformanceMetrics]:
        """
        获取历史指标
        """
        cutoff_time = datetime.now() - timedelta(minutes=minutes)
        return [
            metric for metric in self.metrics_history
            if metric.timestamp >= cutoff_time
        ]
    
    def get_average_metrics(self, minutes: int = 60) -> Dict:
        """
        获取平均指标
        """
        history = self.get_metrics_history(minutes)
        
        if not history:
            return {}
        
        return {
            'cpu_percent': sum(m.cpu_percent for m in history) / len(history),
            'memory_percent': sum(m.memory_percent for m in history) / len(history),
            'memory_used_mb': sum(m.memory_used_mb for m in history) / len(history),
            'disk_io_read_mb': sum(m.disk_io_read_mb for m in history) / len(history),
            'disk_io_write_mb': sum(m.disk_io_write_mb for m in history) / len(history),
            'network_sent_mb': sum(m.network_sent_mb for m in history) / len(history),
            'network_recv_mb': sum(m.network_recv_mb for m in history) / len(history)
        }
    
    def export_metrics(self, filename: str, format: str = 'json'):
        """
        导出指标数据
        """
        try:
            if format == 'json':
                data = [
                    {
                        'timestamp': m.timestamp.isoformat(),
                        'cpu_percent': m.cpu_percent,
                        'memory_percent': m.memory_percent,
                        'memory_used_mb': m.memory_used_mb,
                        'disk_io_read_mb': m.disk_io_read_mb,
                        'disk_io_write_mb': m.disk_io_write_mb,
                        'network_sent_mb': m.network_sent_mb,
                        'network_recv_mb': m.network_recv_mb
                    }
                    for m in self.metrics_history
                ]
                
                with open(filename, 'w') as f:
                    json.dump(data, f, indent=2)
            
            elif format == 'csv':
                df = pd.DataFrame([
                    {
                        'timestamp': m.timestamp,
                        'cpu_percent': m.cpu_percent,
                        'memory_percent': m.memory_percent,
                        'memory_used_mb': m.memory_used_mb,
                        'disk_io_read_mb': m.disk_io_read_mb,
                        'disk_io_write_mb': m.disk_io_write_mb,
                        'network_sent_mb': m.network_sent_mb,
                        'network_recv_mb': m.network_recv_mb
                    }
                    for m in self.metrics_history
                ])
                df.to_csv(filename, index=False)
            
            self.logger.info(f"指标数据已导出到: {filename}")
            
        except Exception as e:
            self.logger.error(f"导出指标数据失败: {str(e)}")
    
    def plot_metrics(self, metrics: List[str] = None, minutes: int = 60):
        """
        绘制指标图表
        """
        if metrics is None:
            metrics = ['cpu_percent', 'memory_percent']
        
        history = self.get_metrics_history(minutes)
        
        if not history:
            self.logger.warning("没有历史数据可绘制")
            return
        
        timestamps = [m.timestamp for m in history]
        
        fig, axes = plt.subplots(len(metrics), 1, figsize=(12, 4 * len(metrics)))
        if len(metrics) == 1:
            axes = [axes]
        
        for i, metric in enumerate(metrics):
            values = [getattr(m, metric) for m in history]
            axes[i].plot(timestamps, values, label=metric)
            axes[i].set_title(f'{metric} - 最近 {minutes} 分钟')
            axes[i].set_xlabel('时间')
            axes[i].set_ylabel(metric)
            axes[i].grid(True)
            axes[i].legend()
        
        plt.tight_layout()
        plt.show()

class CrawlerMonitor:
    """
    爬虫监控器
    """
    
    def __init__(self):
        self.crawlers = {}
        self.system_monitor = SystemMonitor()
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def register_crawler(self, crawler_name: str, engine: CrawlerEngine):
        """
        注册爬虫
        """
        self.crawlers[crawler_name] = {
            'engine': engine,
            'start_time': datetime.now(),
            'last_check': datetime.now()
        }
        self.logger.info(f"爬虫已注册: {crawler_name}")
    
    def unregister_crawler(self, crawler_name: str):
        """
        注销爬虫
        """
        if crawler_name in self.crawlers:
            del self.crawlers[crawler_name]
            self.logger.info(f"爬虫已注销: {crawler_name}")
    
    def get_crawler_stats(self, crawler_name: str = None) -> Dict:
        """
        获取爬虫统计信息
        """
        if crawler_name:
            if crawler_name in self.crawlers:
                crawler_info = self.crawlers[crawler_name]
                engine_stats = crawler_info['engine'].get_stats()
                
                return {
                    'crawler_name': crawler_name,
                    'start_time': crawler_info['start_time'],
                    'uptime': datetime.now() - crawler_info['start_time'],
                    'engine_stats': engine_stats
                }
            return {}
        else:
            # 返回所有爬虫统计
            all_stats = {}
            for name in self.crawlers:
                all_stats[name] = self.get_crawler_stats(name)
            return all_stats
    
    def get_system_stats(self) -> Dict:
        """
        获取系统统计信息
        """
        current_metrics = self.system_monitor.get_current_metrics()
        average_metrics = self.system_monitor.get_average_metrics()
        
        return {
            'current_metrics': current_metrics.__dict__ if current_metrics else {},
            'average_metrics': average_metrics,
            'total_crawlers': len(self.crawlers)
        }
    
    def start_monitoring(self):
        """
        开始监控
        """
        self.system_monitor.start_monitoring()
        self.logger.info("爬虫监控已启动")
    
    def stop_monitoring(self):
        """
        停止监控
        """
        self.system_monitor.stop_monitoring()
        self.logger.info("爬虫监控已停止")
    
    def generate_report(self, output_file: str = None) -> Dict:
        """
        生成监控报告
        """
        report = {
            'generated_at': datetime.now().isoformat(),
            'system_stats': self.get_system_stats(),
            'crawler_stats': self.get_crawler_stats(),
            'summary': self._generate_summary()
        }
        
        if output_file:
            try:
                with open(output_file, 'w') as f:
                    json.dump(report, f, indent=2, default=str)
                self.logger.info(f"监控报告已保存到: {output_file}")
            except Exception as e:
                self.logger.error(f"保存监控报告失败: {str(e)}")
        
        return report
    
    def _generate_summary(self) -> Dict:
        """
        生成摘要信息
        """
        crawler_stats = self.get_crawler_stats()
        
        total_requests = sum(
            stats.get('engine_stats', {}).get('requests_total', 0)
            for stats in crawler_stats.values()
        )
        
        total_success = sum(
            stats.get('engine_stats', {}).get('requests_success', 0)
            for stats in crawler_stats.values()
        )
        
        total_items = sum(
            stats.get('engine_stats', {}).get('items_scraped', 0)
            for stats in crawler_stats.values()
        )
        
        success_rate = (total_success / max(1, total_requests)) * 100
        
        return {
            'total_crawlers': len(crawler_stats),
            'total_requests': total_requests,
            'total_success': total_success,
            'success_rate': round(success_rate, 2),
            'total_items_scraped': total_items
        }

5.2 日志管理

import logging
import logging.handlers
from typing import Dict, Optional, Any
import json
from datetime import datetime
import os
from pathlib import Path
import gzip
import shutil

class CrawlerLogger:
    """
    爬虫日志管理器
    """
    
    def __init__(self, log_dir: str = 'logs', 
                 max_file_size: int = 10 * 1024 * 1024,  # 10MB
                 backup_count: int = 5,
                 log_level: str = 'INFO'):
        self.log_dir = Path(log_dir)
        self.log_dir.mkdir(exist_ok=True)
        
        self.max_file_size = max_file_size
        self.backup_count = backup_count
        self.log_level = getattr(logging, log_level.upper())
        
        self.loggers = {}
        self._setup_main_logger()
    
    def _setup_main_logger(self):
        """
        设置主日志记录器
        """
        # 创建主日志记录器
        main_logger = logging.getLogger('crawler')
        main_logger.setLevel(self.log_level)
        
        # 清除现有处理器
        main_logger.handlers.clear()
        
        # 文件处理器
        file_handler = logging.handlers.RotatingFileHandler(
            self.log_dir / 'crawler.log',
            maxBytes=self.max_file_size,
            backupCount=self.backup_count,
            encoding='utf-8'
        )
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        
        # 设置格式
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        main_logger.addHandler(file_handler)
        main_logger.addHandler(console_handler)
        
        self.loggers['main'] = main_logger
    
    def get_logger(self, name: str) -> logging.Logger:
        """
        获取指定名称的日志记录器
        """
        if name not in self.loggers:
            logger = logging.getLogger(f'crawler.{name}')
            logger.setLevel(self.log_level)
            
            # 文件处理器
            file_handler = logging.handlers.RotatingFileHandler(
                self.log_dir / f'{name}.log',
                maxBytes=self.max_file_size,
                backupCount=self.backup_count,
                encoding='utf-8'
            )
            
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            file_handler.setFormatter(formatter)
            
            logger.addHandler(file_handler)
            self.loggers[name] = logger
        
        return self.loggers[name]
    
    def log_request(self, request: Request, response: Optional[Response] = None,
                   error: Optional[str] = None):
        """
        记录请求日志
        """
        logger = self.get_logger('requests')
        
        log_data = {
            'timestamp': datetime.now().isoformat(),
            'url': request.url,
            'method': request.method,
            'status': 'success' if response else 'failed'
        }
        
        if response:
            log_data.update({
                'status_code': getattr(response, 'status', None),
                'content_length': len(getattr(response, 'content', ''))
            })
        
        if error:
            log_data['error'] = error
        
        logger.info(json.dumps(log_data, ensure_ascii=False))
    
    def log_item(self, item: Dict, spider_name: str, pipeline: str = None):
        """
        记录数据项日志
        """
        logger = self.get_logger('items')
        
        log_data = {
            'timestamp': datetime.now().isoformat(),
            'spider': spider_name,
            'pipeline': pipeline,
            'item': item
        }
        
        logger.info(json.dumps(log_data, ensure_ascii=False))
    
    def log_error(self, error: Exception, context: Dict = None):
        """
        记录错误日志
        """
        logger = self.get_logger('errors')
        
        log_data = {
            'timestamp': datetime.now().isoformat(),
            'error_type': type(error).__name__,
            'error_message': str(error),
            'context': context or {}
        }
        
        logger.error(json.dumps(log_data, ensure_ascii=False))
    
    def log_stats(self, stats: Dict, spider_name: str):
        """
        记录统计日志
        """
        logger = self.get_logger('stats')
        
        log_data = {
            'timestamp': datetime.now().isoformat(),
            'spider': spider_name,
            'stats': stats
        }
        
        logger.info(json.dumps(log_data, ensure_ascii=False))
    
    def compress_old_logs(self, days_old: int = 7):
        """
        压缩旧日志文件
        """
        cutoff_time = datetime.now().timestamp() - (days_old * 24 * 3600)
        
        for log_file in self.log_dir.glob('*.log.*'):
            if log_file.stat().st_mtime < cutoff_time:
                compressed_file = log_file.with_suffix(log_file.suffix + '.gz')
                
                with open(log_file, 'rb') as f_in:
                    with gzip.open(compressed_file, 'wb') as f_out:
                        shutil.copyfileobj(f_in, f_out)
                
                log_file.unlink()
                print(f"已压缩日志文件: {compressed_file}")
    
    def cleanup_logs(self, days_to_keep: int = 30):
        """
        清理旧日志文件
        """
        cutoff_time = datetime.now().timestamp() - (days_to_keep * 24 * 3600)
        
        for log_file in self.log_dir.glob('*.log.*'):
            if log_file.stat().st_mtime < cutoff_time:
                log_file.unlink()
                print(f"已删除旧日志文件: {log_file}")
    
    def get_log_stats(self) -> Dict:
        """
        获取日志统计信息
        """
        stats = {
            'total_files': 0,
            'total_size_mb': 0,
            'files_by_type': {}
        }
        
        for log_file in self.log_dir.glob('*'):
            if log_file.is_file():
                stats['total_files'] += 1
                file_size = log_file.stat().st_size / (1024 * 1024)
                stats['total_size_mb'] += file_size
                
                file_type = log_file.stem.split('.')[0]
                if file_type not in stats['files_by_type']:
                    stats['files_by_type'][file_type] = {
                        'count': 0,
                        'size_mb': 0
                    }
                
                stats['files_by_type'][file_type]['count'] += 1
                stats['files_by_type'][file_type]['size_mb'] += file_size
        
        stats['total_size_mb'] = round(stats['total_size_mb'], 2)
        
        for file_type in stats['files_by_type']:
            stats['files_by_type'][file_type]['size_mb'] = round(
                stats['files_by_type'][file_type]['size_mb'], 2
            )
        
        return stats
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐