042-图像分析与AI

🔴 专家级 | ⏱️ 预计阅读时间:45分钟 | 🕐 实践时间:3-4小时

学习目标

通过本章节的学习,你将掌握:

  • 深度学习在图像分析中的应用
  • 计算机视觉技术与ExifTool的结合
  • AI驱动的图像处理和增强
  • 智能图像标记和分类系统
  • 图像内容理解和场景识别
  • 实时图像分析和处理

深度学习图像分析框架

核心架构设计

# 文件路径: ai/image_analysis_framework.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.models as models
import cv2
import numpy as np
import logging
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass
from pathlib import Path
import json
from PIL import Image
import albumentations as A
from albumentations.pytorch import ToTensorV2
import timm
from transformers import CLIPProcessor, CLIPModel
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import seaborn as sns

@dataclass
class AnalysisResult:
    """图像分析结果"""
    image_path: str
    predictions: Dict[str, float]
    features: np.ndarray
    confidence: float
    processing_time: float
    metadata: Dict[str, Any]
    annotations: List[Dict[str, Any]]

class ImageAnalysisFramework:
    """深度学习图像分析框架"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.models = {}
        self.processors = {}
        self.transforms = {}
        self.logger = logging.getLogger(f"{__name__}.ImageAnalysisFramework")
        
        # 初始化模型
        self._initialize_models()
        self._initialize_transforms()
    
    def _initialize_models(self):
        """初始化深度学习模型"""
        # 图像分类模型
        if 'classification' in self.config.get('models', {}):
            self._load_classification_model()
        
        # 目标检测模型
        if 'detection' in self.config.get('models', {}):
            self._load_detection_model()
        
        # 语义分割模型
        if 'segmentation' in self.config.get('models', {}):
            self._load_segmentation_model()
        
        # CLIP模型(多模态)
        if 'clip' in self.config.get('models', {}):
            self._load_clip_model()
        
        # 特征提取模型
        if 'feature_extraction' in self.config.get('models', {}):
            self._load_feature_extraction_model()
    
    def _load_classification_model(self):
        """加载图像分类模型"""
        model_config = self.config['models']['classification']
        model_name = model_config.get('name', 'efficientnet_b0')
        num_classes = model_config.get('num_classes', 1000)
        pretrained = model_config.get('pretrained', True)
        
        if model_name.startswith('efficientnet'):
            # 使用timm库的EfficientNet
            model = timm.create_model(
                model_name, 
                pretrained=pretrained, 
                num_classes=num_classes
            )
        else:
            # 使用torchvision模型
            model = getattr(models, model_name)(pretrained=pretrained)
            if hasattr(model, 'classifier'):
                in_features = model.classifier.in_features
                model.classifier = nn.Linear(in_features, num_classes)
            elif hasattr(model, 'fc'):
                in_features = model.fc.in_features
                model.fc = nn.Linear(in_features, num_classes)
        
        model = model.to(self.device)
        model.eval()
        
        self.models['classification'] = model
        self.logger.info(f"Loaded classification model: {model_name}")
    
    def _load_detection_model(self):
        """加载目标检测模型"""
        model_config = self.config['models']['detection']
        model_name = model_config.get('name', 'fasterrcnn_resnet50_fpn')
        
        # 使用torchvision的预训练检测模型
        model = getattr(models.detection, model_name)(pretrained=True)
        model = model.to(self.device)
        model.eval()
        
        self.models['detection'] = model
        self.logger.info(f"Loaded detection model: {model_name}")
    
    def _load_segmentation_model(self):
        """加载语义分割模型"""
        model_config = self.config['models']['segmentation']
        model_name = model_config.get('name', 'deeplabv3_resnet50')
        
        # 使用torchvision的预训练分割模型
        model = getattr(models.segmentation, model_name)(pretrained=True)
        model = model.to(self.device)
        model.eval()
        
        self.models['segmentation'] = model
        self.logger.info(f"Loaded segmentation model: {model_name}")
    
    def _load_clip_model(self):
        """加载CLIP多模态模型"""
        model_config = self.config['models']['clip']
        model_name = model_config.get('name', 'openai/clip-vit-base-patch32')
        
        # 加载CLIP模型和处理器
        model = CLIPModel.from_pretrained(model_name)
        processor = CLIPProcessor.from_pretrained(model_name)
        
        model = model.to(self.device)
        model.eval()
        
        self.models['clip'] = model
        self.processors['clip'] = processor
        self.logger.info(f"Loaded CLIP model: {model_name}")
    
    def _load_feature_extraction_model(self):
        """加载特征提取模型"""
        model_config = self.config['models']['feature_extraction']
        model_name = model_config.get('name', 'resnet50')
        
        # 使用预训练模型作为特征提取器
        model = getattr(models, model_name)(pretrained=True)
        
        # 移除最后的分类层
        if hasattr(model, 'fc'):
            model = nn.Sequential(*list(model.children())[:-1])
        elif hasattr(model, 'classifier'):
            model = nn.Sequential(*list(model.children())[:-1])
        
        model = model.to(self.device)
        model.eval()
        
        self.models['feature_extraction'] = model
        self.logger.info(f"Loaded feature extraction model: {model_name}")
    
    def _initialize_transforms(self):
        """初始化图像变换"""
        # 分类模型的变换
        self.transforms['classification'] = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])
        
        # 检测模型的变换
        self.transforms['detection'] = transforms.Compose([
            transforms.ToTensor()
        ])
        
        # 分割模型的变换
        self.transforms['segmentation'] = transforms.Compose([
            transforms.Resize((520, 520)),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])
        
        # 特征提取的变换
        self.transforms['feature_extraction'] = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])
    
    def analyze_image(self, image_path: str, analysis_types: List[str] = None) -> AnalysisResult:
        """分析单张图像"""
        import time
        start_time = time.time()
        
        if analysis_types is None:
            analysis_types = list(self.models.keys())
        
        # 加载图像
        image = Image.open(image_path).convert('RGB')
        
        # 执行各种分析
        predictions = {}
        features = None
        annotations = []
        
        for analysis_type in analysis_types:
            if analysis_type not in self.models:
                continue
            
            try:
                if analysis_type == 'classification':
                    result = self._classify_image(image)
                    predictions.update(result)
                
                elif analysis_type == 'detection':
                    result = self._detect_objects(image)
                    predictions.update(result['predictions'])
                    annotations.extend(result['annotations'])
                
                elif analysis_type == 'segmentation':
                    result = self._segment_image(image)
                    predictions.update(result)
                
                elif analysis_type == 'clip':
                    result = self._analyze_with_clip(image)
                    predictions.update(result)
                
                elif analysis_type == 'feature_extraction':
                    features = self._extract_features(image)
                
            except Exception as e:
                self.logger.error(f"Error in {analysis_type} analysis: {e}")
        
        # 计算总体置信度
        confidence = np.mean([v for v in predictions.values() if isinstance(v, (int, float))])
        
        processing_time = time.time() - start_time
        
        return AnalysisResult(
            image_path=image_path,
            predictions=predictions,
            features=features,
            confidence=float(confidence) if not np.isnan(confidence) else 0.0,
            processing_time=processing_time,
            metadata=self._extract_image_metadata(image_path),
            annotations=annotations
        )
    
    def _classify_image(self, image: Image.Image) -> Dict[str, float]:
        """图像分类"""
        model = self.models['classification']
        transform = self.transforms['classification']
        
        # 预处理
        input_tensor = transform(image).unsqueeze(0).to(self.device)
        
        # 推理
        with torch.no_grad():
            outputs = model(input_tensor)
            probabilities = F.softmax(outputs, dim=1)
            top5_prob, top5_indices = torch.topk(probabilities, 5)
        
        # 返回top5结果
        results = {}
        for i in range(5):
            class_idx = top5_indices[0][i].item()
            prob = top5_prob[0][i].item()
            results[f'class_{class_idx}'] = prob
        
        return results
    
    def _detect_objects(self, image: Image.Image) -> Dict[str, Any]:
        """目标检测"""
        model = self.models['detection']
        transform = self.transforms['detection']
        
        # 预处理
        input_tensor = transform(image).unsqueeze(0).to(self.device)
        
        # 推理
        with torch.no_grad():
            outputs = model(input_tensor)
        
        # 处理输出
        predictions = {}
        annotations = []
        
        boxes = outputs[0]['boxes'].cpu().numpy()
        scores = outputs[0]['scores'].cpu().numpy()
        labels = outputs[0]['labels'].cpu().numpy()
        
        # 过滤低置信度检测
        threshold = 0.5
        valid_indices = scores > threshold
        
        for i, (box, score, label) in enumerate(zip(
            boxes[valid_indices], 
            scores[valid_indices], 
            labels[valid_indices]
        )):
            predictions[f'object_{i}_class'] = int(label)
            predictions[f'object_{i}_confidence'] = float(score)
            
            annotations.append({
                'type': 'detection',
                'bbox': box.tolist(),
                'class': int(label),
                'confidence': float(score)
            })
        
        return {
            'predictions': predictions,
            'annotations': annotations
        }
    
    def _segment_image(self, image: Image.Image) -> Dict[str, float]:
        """语义分割"""
        model = self.models['segmentation']
        transform = self.transforms['segmentation']
        
        # 预处理
        input_tensor = transform(image).unsqueeze(0).to(self.device)
        
        # 推理
        with torch.no_grad():
            outputs = model(input_tensor)
            segmentation = outputs['out']
        
        # 计算各类别的像素比例
        segmentation = F.softmax(segmentation, dim=1)
        class_pixels = torch.sum(segmentation, dim=[2, 3])[0]
        total_pixels = torch.sum(class_pixels)
        class_ratios = class_pixels / total_pixels
        
        # 返回主要类别的比例
        results = {}
        top_classes = torch.topk(class_ratios, 5)
        
        for i, (ratio, class_idx) in enumerate(zip(top_classes.values, top_classes.indices)):
            results[f'segment_class_{class_idx.item()}'] = ratio.item()
        
        return results
    
    def _analyze_with_clip(self, image: Image.Image) -> Dict[str, float]:
        """使用CLIP进行多模态分析"""
        model = self.models['clip']
        processor = self.processors['clip']
        
        # 定义一些常见的文本描述
        text_descriptions = [
            "a photo of a person",
            "a photo of an animal",
            "a photo of a landscape",
            "a photo of a building",
            "a photo of food",
            "a photo of a vehicle",
            "a photo of nature",
            "a photo of technology",
            "a photo of art",
            "a photo of sports"
        ]
        
        # 处理输入
        inputs = processor(
            text=text_descriptions,
            images=image,
            return_tensors="pt",
            padding=True
        )
        
        # 移动到设备
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        
        # 推理
        with torch.no_grad():
            outputs = model(**inputs)
            logits_per_image = outputs.logits_per_image
            probs = logits_per_image.softmax(dim=1)
        
        # 返回结果
        results = {}
        for i, (desc, prob) in enumerate(zip(text_descriptions, probs[0])):
            results[f'clip_{desc.replace(" ", "_")}'] = prob.item()
        
        return results
    
    def _extract_features(self, image: Image.Image) -> np.ndarray:
        """提取图像特征"""
        model = self.models['feature_extraction']
        transform = self.transforms['feature_extraction']
        
        # 预处理
        input_tensor = transform(image).unsqueeze(0).to(self.device)
        
        # 推理
        with torch.no_grad():
            features = model(input_tensor)
            features = features.squeeze().cpu().numpy()
        
        return features
    
    def _extract_image_metadata(self, image_path: str) -> Dict[str, Any]:
        """提取图像元数据"""
        try:
            from exiftool import ExifToolHelper
            
            with ExifToolHelper() as et:
                metadata = et.get_metadata(image_path)[0]
                
                # 提取关键信息
                key_metadata = {
                    'file_size': metadata.get('File:FileSize', 0),
                    'image_width': metadata.get('File:ImageWidth', 0),
                    'image_height': metadata.get('File:ImageHeight', 0),
                    'camera_make': metadata.get('EXIF:Make', ''),
                    'camera_model': metadata.get('EXIF:Model', ''),
                    'lens_model': metadata.get('EXIF:LensModel', ''),
                    'focal_length': metadata.get('EXIF:FocalLength', 0),
                    'aperture': metadata.get('EXIF:FNumber', 0),
                    'iso': metadata.get('EXIF:ISO', 0),
                    'shutter_speed': metadata.get('EXIF:ShutterSpeed', ''),
                    'date_taken': metadata.get('EXIF:DateTimeOriginal', '')
                }
                
                return key_metadata
        
        except Exception as e:
            self.logger.error(f"Error extracting metadata: {e}")
            return {}
    
    def batch_analyze(self, image_paths: List[str], analysis_types: List[str] = None) -> List[AnalysisResult]:
        """批量分析图像"""
        results = []
        
        for image_path in image_paths:
            try:
                result = self.analyze_image(image_path, analysis_types)
                results.append(result)
                self.logger.info(f"Analyzed: {image_path}")
            except Exception as e:
                self.logger.error(f"Error analyzing {image_path}: {e}")
        
        return results
    
    def save_analysis_results(self, results: List[AnalysisResult], output_path: str):
        """保存分析结果"""
        # 转换为可序列化的格式
        serializable_results = []
        
        for result in results:
            serializable_result = {
                'image_path': result.image_path,
                'predictions': result.predictions,
                'features': result.features.tolist() if result.features is not None else None,
                'confidence': result.confidence,
                'processing_time': result.processing_time,
                'metadata': result.metadata,
                'annotations': result.annotations
            }
            serializable_results.append(serializable_result)
        
        # 保存到JSON文件
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(serializable_results, f, indent=2, ensure_ascii=False)
        
        self.logger.info(f"Analysis results saved to: {output_path}")

# 使用示例
def example_image_analysis():
    """图像分析使用示例"""
    # 配置
    config = {
        'models': {
            'classification': {
                'name': 'efficientnet_b0',
                'num_classes': 1000,
                'pretrained': True
            },
            'detection': {
                'name': 'fasterrcnn_resnet50_fpn'
            },
            'clip': {
                'name': 'openai/clip-vit-base-patch32'
            },
            'feature_extraction': {
                'name': 'resnet50'
            }
        }
    }
    
    # 创建分析框架
    framework = ImageAnalysisFramework(config)
    
    # 分析单张图像
    image_path = "example_image.jpg"
    result = framework.analyze_image(
        image_path, 
        analysis_types=['classification', 'clip', 'feature_extraction']
    )
    
    print(f"Analysis result for {image_path}:")
    print(f"Confidence: {result.confidence:.3f}")
    print(f"Processing time: {result.processing_time:.3f}s")
    print(f"Predictions: {result.predictions}")
    
    # 批量分析
    image_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]
    results = framework.batch_analyze(image_paths)
    
    # 保存结果
    framework.save_analysis_results(results, "analysis_results.json")

if __name__ == "__main__":
    example_image_analysis()

计算机视觉技术集成

高级图像处理

# 文件路径: ai/computer_vision.py
import cv2
import numpy as np
import torch
import torch.nn as nn
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass
import logging
from sklearn.cluster import KMeans, DBSCAN
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from scipy import ndimage
from skimage import feature, measure, segmentation, filters
from skimage.feature import local_binary_pattern, hog
from skimage.segmentation import slic, felzenszwalb
from skimage.measure import regionprops
import albumentations as A

@dataclass
class VisionAnalysisResult:
    """计算机视觉分析结果"""
    image_path: str
    features: Dict[str, Any]
    objects: List[Dict[str, Any]]
    regions: List[Dict[str, Any]]
    quality_metrics: Dict[str, float]
    enhancement_suggestions: List[str]
    processing_time: float

class ComputerVisionProcessor:
    """计算机视觉处理器"""
    
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        self.logger = logging.getLogger(f"{__name__}.ComputerVisionProcessor")
        
        # 初始化各种检测器和分析器
        self._initialize_detectors()
    
    def _initialize_detectors(self):
        """初始化检测器"""
        # 边缘检测器
        self.edge_detectors = {
            'canny': self._canny_edge_detection,
            'sobel': self._sobel_edge_detection,
            'laplacian': self._laplacian_edge_detection
        }
        
        # 特征检测器
        self.feature_detectors = {
            'sift': cv2.SIFT_create(),
            'orb': cv2.ORB_create(),
            'surf': cv2.xfeatures2d.SURF_create() if hasattr(cv2, 'xfeatures2d') else None
        }
        
        # 分割算法
        self.segmentation_methods = {
            'watershed': self._watershed_segmentation,
            'slic': self._slic_segmentation,
            'felzenszwalb': self._felzenszwalb_segmentation
        }
    
    def analyze_image(self, image_path: str) -> VisionAnalysisResult:
        """全面分析图像"""
        import time
        start_time = time.time()
        
        # 加载图像
        image = cv2.imread(image_path)
        if image is None:
            raise ValueError(f"Cannot load image: {image_path}")
        
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        
        # 执行各种分析
        features = self._extract_comprehensive_features(image, gray)
        objects = self._detect_objects_cv(image, gray)
        regions = self._analyze_regions(image, gray)
        quality_metrics = self._assess_image_quality(image, gray)
        enhancement_suggestions = self._generate_enhancement_suggestions(quality_metrics)
        
        processing_time = time.time() - start_time
        
        return VisionAnalysisResult(
            image_path=image_path,
            features=features,
            objects=objects,
            regions=regions,
            quality_metrics=quality_metrics,
            enhancement_suggestions=enhancement_suggestions,
            processing_time=processing_time
        )
    
    def _extract_comprehensive_features(self, image: np.ndarray, gray: np.ndarray) -> Dict[str, Any]:
        """提取综合特征"""
        features = {}
        
        # 基本统计特征
        features['color_stats'] = self._extract_color_statistics(image)
        features['texture_features'] = self._extract_texture_features(gray)
        features['shape_features'] = self._extract_shape_features(gray)
        features['frequency_features'] = self._extract_frequency_features(gray)
        
        # 高级特征
        features['keypoints'] = self._extract_keypoint_features(gray)
        features['edges'] = self._extract_edge_features(gray)
        features['corners'] = self._extract_corner_features(gray)
        
        return features
    
    def _extract_color_statistics(self, image: np.ndarray) -> Dict[str, Any]:
        """提取颜色统计特征"""
        # 转换到不同颜色空间
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
        
        stats = {}
        
        # BGR统计
        for i, channel in enumerate(['B', 'G', 'R']):
            channel_data = image[:, :, i]
            stats[f'{channel}_mean'] = float(np.mean(channel_data))
            stats[f'{channel}_std'] = float(np.std(channel_data))
            stats[f'{channel}_min'] = int(np.min(channel_data))
            stats[f'{channel}_max'] = int(np.max(channel_data))
        
        # HSV统计
        for i, channel in enumerate(['H', 'S', 'V']):
            channel_data = hsv[:, :, i]
            stats[f'{channel}_mean'] = float(np.mean(channel_data))
            stats[f'{channel}_std'] = float(np.std(channel_data))
        
        # 颜色直方图
        hist_b = cv2.calcHist([image], [0], None, [256], [0, 256])
        hist_g = cv2.calcHist([image], [1], None, [256], [0, 256])
        hist_r = cv2.calcHist([image], [2], None, [256], [0, 256])
        
        stats['color_histogram'] = {
            'B': hist_b.flatten().tolist(),
            'G': hist_g.flatten().tolist(),
            'R': hist_r.flatten().tolist()
        }
        
        # 主要颜色
        stats['dominant_colors'] = self._extract_dominant_colors(image)
        
        return stats
    
    def _extract_dominant_colors(self, image: np.ndarray, k: int = 5) -> List[List[int]]:
        """提取主要颜色"""
        # 重塑图像数据
        data = image.reshape((-1, 3))
        data = np.float32(data)
        
        # K-means聚类
        criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 20, 1.0)
        _, labels, centers = cv2.kmeans(data, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
        
        # 转换为整数并返回
        centers = np.uint8(centers)
        return centers.tolist()
    
    def _extract_texture_features(self, gray: np.ndarray) -> Dict[str, Any]:
        """提取纹理特征"""
        features = {}
        
        # LBP (Local Binary Pattern)
        radius = 3
        n_points = 8 * radius
        lbp = local_binary_pattern(gray, n_points, radius, method='uniform')
        
        # LBP直方图
        lbp_hist, _ = np.histogram(lbp.ravel(), bins=n_points + 2, range=(0, n_points + 2))
        features['lbp_histogram'] = lbp_hist.tolist()
        
        # GLCM (Gray Level Co-occurrence Matrix) 特征
        from skimage.feature import greycomatrix, greycoprops
        
        # 计算GLCM
        distances = [1, 2, 3]
        angles = [0, 45, 90, 135]
        
        glcm = greycomatrix(
            gray, distances=distances, angles=np.radians(angles),
            levels=256, symmetric=True, normed=True
        )
        
        # 提取GLCM属性
        properties = ['contrast', 'dissimilarity', 'homogeneity', 'energy']
        for prop in properties:
            values = greycoprops(glcm, prop)
            features[f'glcm_{prop}_mean'] = float(np.mean(values))
            features[f'glcm_{prop}_std'] = float(np.std(values))
        
        # HOG特征
        hog_features = hog(
            gray, orientations=9, pixels_per_cell=(8, 8),
            cells_per_block=(2, 2), visualize=False
        )
        features['hog_mean'] = float(np.mean(hog_features))
        features['hog_std'] = float(np.std(hog_features))
        
        return features
    
    def _extract_shape_features(self, gray: np.ndarray) -> Dict[str, Any]:
        """提取形状特征"""
        features = {}
        
        # 二值化
        _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        
        # 查找轮廓
        contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        if contours:
            # 最大轮廓
            largest_contour = max(contours, key=cv2.contourArea)
            
            # 轮廓特征
            area = cv2.contourArea(largest_contour)
            perimeter = cv2.arcLength(largest_contour, True)
            
            if perimeter > 0:
                features['shape_area'] = float(area)
                features['shape_perimeter'] = float(perimeter)
                features['shape_compactness'] = float(4 * np.pi * area / (perimeter ** 2))
            
            # 边界框
            x, y, w, h = cv2.boundingRect(largest_contour)
            features['bbox_aspect_ratio'] = float(w / h) if h > 0 else 0.0
            
            # 凸包
            hull = cv2.convexHull(largest_contour)
            hull_area = cv2.contourArea(hull)
            if hull_area > 0:
                features['convexity'] = float(area / hull_area)
        
        return features
    
    def _extract_frequency_features(self, gray: np.ndarray) -> Dict[str, Any]:
        """提取频域特征"""
        features = {}
        
        # FFT
        f_transform = np.fft.fft2(gray)
        f_shift = np.fft.fftshift(f_transform)
        magnitude_spectrum = np.log(np.abs(f_shift) + 1)
        
        # 频域统计
        features['fft_mean'] = float(np.mean(magnitude_spectrum))
        features['fft_std'] = float(np.std(magnitude_spectrum))
        features['fft_energy'] = float(np.sum(magnitude_spectrum ** 2))
        
        # 径向频率分布
        h, w = gray.shape
        center = (h // 2, w // 2)
        y, x = np.ogrid[:h, :w]
        radius = np.sqrt((x - center[1]) ** 2 + (y - center[0]) ** 2)
        
        # 计算不同频率带的能量
        max_radius = min(center)
        for i, r in enumerate([max_radius // 4, max_radius // 2, 3 * max_radius // 4]):
            mask = radius <= r
            energy = np.sum(magnitude_spectrum[mask] ** 2)
            features[f'radial_energy_band_{i}'] = float(energy)
        
        return features
    
    def _extract_keypoint_features(self, gray: np.ndarray) -> Dict[str, Any]:
        """提取关键点特征"""
        features = {}
        
        for name, detector in self.feature_detectors.items():
            if detector is None:
                continue
            
            try:
                if name == 'sift':
                    keypoints, descriptors = detector.detectAndCompute(gray, None)
                elif name == 'orb':
                    keypoints, descriptors = detector.detectAndCompute(gray, None)
                elif name == 'surf':
                    keypoints, descriptors = detector.detectAndCompute(gray, None)
                
                features[f'{name}_keypoints_count'] = len(keypoints)
                
                if descriptors is not None:
                    features[f'{name}_descriptors_mean'] = float(np.mean(descriptors))
                    features[f'{name}_descriptors_std'] = float(np.std(descriptors))
                
            except Exception as e:
                self.logger.warning(f"Error extracting {name} features: {e}")
        
        return features
    
    def _extract_edge_features(self, gray: np.ndarray) -> Dict[str, Any]:
        """提取边缘特征"""
        features = {}
        
        for name, detector in self.edge_detectors.items():
            try:
                edges = detector(gray)
                
                # 边缘统计
                edge_pixels = np.sum(edges > 0)
                total_pixels = edges.size
                edge_density = edge_pixels / total_pixels
                
                features[f'{name}_edge_density'] = float(edge_density)
                features[f'{name}_edge_count'] = int(edge_pixels)
                
            except Exception as e:
                self.logger.warning(f"Error extracting {name} edges: {e}")
        
        return features
    
    def _extract_corner_features(self, gray: np.ndarray) -> Dict[str, Any]:
        """提取角点特征"""
        features = {}
        
        # Harris角点检测
        harris_corners = cv2.cornerHarris(gray, 2, 3, 0.04)
        harris_count = np.sum(harris_corners > 0.01 * harris_corners.max())
        features['harris_corners_count'] = int(harris_count)
        
        # Shi-Tomasi角点检测
        corners = cv2.goodFeaturesToTrack(gray, 100, 0.01, 10)
        features['shi_tomasi_corners_count'] = len(corners) if corners is not None else 0
        
        return features
    
    def _canny_edge_detection(self, gray: np.ndarray) -> np.ndarray:
        """Canny边缘检测"""
        return cv2.Canny(gray, 50, 150)
    
    def _sobel_edge_detection(self, gray: np.ndarray) -> np.ndarray:
        """Sobel边缘检测"""
        sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
        sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
        return np.sqrt(sobelx**2 + sobely**2)
    
    def _laplacian_edge_detection(self, gray: np.ndarray) -> np.ndarray:
        """Laplacian边缘检测"""
        return cv2.Laplacian(gray, cv2.CV_64F)
    
    def _detect_objects_cv(self, image: np.ndarray, gray: np.ndarray) -> List[Dict[str, Any]]:
        """使用传统计算机视觉方法检测对象"""
        objects = []
        
        # 使用轮廓检测对象
        _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        for i, contour in enumerate(contours):
            area = cv2.contourArea(contour)
            
            # 过滤小对象
            if area < 100:
                continue
            
            # 边界框
            x, y, w, h = cv2.boundingRect(contour)
            
            # 对象特征
            perimeter = cv2.arcLength(contour, True)
            aspect_ratio = w / h if h > 0 else 0
            extent = area / (w * h) if w * h > 0 else 0
            
            # 凸包
            hull = cv2.convexHull(contour)
            hull_area = cv2.contourArea(hull)
            solidity = area / hull_area if hull_area > 0 else 0
            
            objects.append({
                'id': i,
                'bbox': [int(x), int(y), int(w), int(h)],
                'area': float(area),
                'perimeter': float(perimeter),
                'aspect_ratio': float(aspect_ratio),
                'extent': float(extent),
                'solidity': float(solidity),
                'type': 'contour_object'
            })
        
        return objects
    
    def _analyze_regions(self, image: np.ndarray, gray: np.ndarray) -> List[Dict[str, Any]]:
        """分析图像区域"""
        regions = []
        
        # SLIC超像素分割
        segments = slic(image, n_segments=100, compactness=10, sigma=1)
        
        # 分析每个区域
        for region_id in np.unique(segments):
            mask = segments == region_id
            
            # 区域属性
            props = measure.regionprops(mask.astype(int))[0]
            
            # 颜色统计
            region_pixels = image[mask]
            color_mean = np.mean(region_pixels, axis=0)
            color_std = np.std(region_pixels, axis=0)
            
            regions.append({
                'id': int(region_id),
                'area': float(props.area),
                'centroid': [float(props.centroid[0]), float(props.centroid[1])],
                'bbox': [float(x) for x in props.bbox],
                'eccentricity': float(props.eccentricity),
                'solidity': float(props.solidity),
                'color_mean': color_mean.tolist(),
                'color_std': color_std.tolist(),
                'type': 'superpixel_region'
            })
        
        return regions
    
    def _watershed_segmentation(self, gray: np.ndarray) -> np.ndarray:
        """分水岭分割"""
        # 距离变换
        _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        dist_transform = cv2.distanceTransform(binary, cv2.DIST_L2, 5)
        
        # 查找局部最大值
        _, sure_fg = cv2.threshold(dist_transform, 0.7 * dist_transform.max(), 255, 0)
        sure_fg = np.uint8(sure_fg)
        
        # 分水岭算法
        unknown = cv2.subtract(binary, sure_fg)
        _, markers = cv2.connectedComponents(sure_fg)
        markers = markers + 1
        markers[unknown == 255] = 0
        
        # 应用分水岭
        image_color = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
        markers = cv2.watershed(image_color, markers)
        
        return markers
    
    def _slic_segmentation(self, image: np.ndarray) -> np.ndarray:
        """SLIC超像素分割"""
        return slic(image, n_segments=100, compactness=10, sigma=1)
    
    def _felzenszwalb_segmentation(self, image: np.ndarray) -> np.ndarray:
        """Felzenszwalb分割"""
        return felzenszwalb(image, scale=100, sigma=0.5, min_size=50)
    
    def _assess_image_quality(self, image: np.ndarray, gray: np.ndarray) -> Dict[str, float]:
        """评估图像质量"""
        metrics = {}
        
        # 清晰度 (基于Laplacian方差)
        laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
        metrics['sharpness'] = float(laplacian_var)
        
        # 对比度 (标准差)
        contrast = np.std(gray)
        metrics['contrast'] = float(contrast)
        
        # 亮度 (平均值)
        brightness = np.mean(gray)
        metrics['brightness'] = float(brightness)
        
        # 噪声估计 (基于高频成分)
        kernel = np.array([[-1, -1, -1], [-1, 8, -1], [-1, -1, -1]])
        noise = cv2.filter2D(gray, -1, kernel)
        noise_level = np.std(noise)
        metrics['noise_level'] = float(noise_level)
        
        # 饱和度 (HSV空间)
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        saturation = np.mean(hsv[:, :, 1])
        metrics['saturation'] = float(saturation)
        
        # 色彩丰富度
        unique_colors = len(np.unique(image.reshape(-1, image.shape[2]), axis=0))
        total_pixels = image.shape[0] * image.shape[1]
        color_richness = unique_colors / total_pixels
        metrics['color_richness'] = float(color_richness)
        
        # 曝光评估
        hist = cv2.calcHist([gray], [0], None, [256], [0, 256])
        
        # 过曝检测 (高亮区域比例)
        overexposed = np.sum(hist[240:]) / total_pixels
        metrics['overexposure'] = float(overexposed)
        
        # 欠曝检测 (暗部区域比例)
        underexposed = np.sum(hist[:16]) / total_pixels
        metrics['underexposure'] = float(underexposed)
        
        return metrics
    
    def _generate_enhancement_suggestions(self, quality_metrics: Dict[str, float]) -> List[str]:
        """生成图像增强建议"""
        suggestions = []
        
        # 清晰度建议
        if quality_metrics.get('sharpness', 0) < 100:
            suggestions.append("图像可能模糊,建议应用锐化滤镜")
        
        # 对比度建议
        if quality_metrics.get('contrast', 0) < 30:
            suggestions.append("对比度较低,建议增强对比度")
        elif quality_metrics.get('contrast', 0) > 80:
            suggestions.append("对比度过高,建议降低对比度")
        
        # 亮度建议
        brightness = quality_metrics.get('brightness', 128)
        if brightness < 80:
            suggestions.append("图像偏暗,建议增加亮度")
        elif brightness > 180:
            suggestions.append("图像偏亮,建议降低亮度")
        
        # 噪声建议
        if quality_metrics.get('noise_level', 0) > 20:
            suggestions.append("检测到噪声,建议应用降噪滤镜")
        
        # 饱和度建议
        saturation = quality_metrics.get('saturation', 128)
        if saturation < 50:
            suggestions.append("饱和度较低,建议增强色彩饱和度")
        elif saturation > 200:
            suggestions.append("饱和度过高,建议降低色彩饱和度")
        
        # 曝光建议
        if quality_metrics.get('overexposure', 0) > 0.05:
            suggestions.append("检测到过曝区域,建议降低曝光或使用HDR处理")
        
        if quality_metrics.get('underexposure', 0) > 0.1:
            suggestions.append("检测到欠曝区域,建议提升阴影或使用曝光补偿")
        
        return suggestions

# 使用示例
def example_computer_vision():
    """计算机视觉使用示例"""
    # 创建处理器
    processor = ComputerVisionProcessor()
    
    # 分析图像
    image_path = "example_image.jpg"
    result = processor.analyze_image(image_path)
    
    print(f"Vision analysis for {image_path}:")
    print(f"Processing time: {result.processing_time:.3f}s")
    print(f"Objects detected: {len(result.objects)}")
    print(f"Regions analyzed: {len(result.regions)}")
    print(f"Quality metrics: {result.quality_metrics}")
    print(f"Enhancement suggestions: {result.enhancement_suggestions}")
    
    # 显示特征统计
    print("\nFeature summary:")
    for category, features in result.features.items():
        if isinstance(features, dict):
            print(f"  {category}: {len(features)} features")
        else:
            print(f"  {category}: {features}")

if __name__ == "__main__":
    example_computer_vision()

智能图像处理

AI驱动的图像增强

# 文件路径: ai/intelligent_processing.py
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass
import logging
from PIL import Image, ImageEnhance, ImageFilter
import albumentations as A
from albumentations.pytorch import ToTensorV2
import torchvision.transforms as transforms
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns

@dataclass
class ProcessingResult:
    """图像处理结果"""
    original_path: str
    processed_image: np.ndarray
    enhancement_type: str
    quality_improvement: Dict[str, float]
    processing_time: float
    confidence_score: float

class IntelligentImageProcessor:
    """智能图像处理器"""
    
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        self.logger = logging.getLogger(f"{__name__}.IntelligentImageProcessor")
        
        # 初始化处理器
        self._initialize_processors()
        
        # 加载预训练模型
        self._load_models()
    
    def _initialize_processors(self):
        """初始化处理器"""
        # 图像增强方法
        self.enhancement_methods = {
            'auto_contrast': self._auto_contrast_enhancement,
            'histogram_equalization': self._histogram_equalization,
            'adaptive_histogram': self._adaptive_histogram_equalization,
            'gamma_correction': self._gamma_correction,
            'unsharp_masking': self._unsharp_masking,
            'noise_reduction': self._noise_reduction,
            'super_resolution': self._super_resolution,
            'hdr_tone_mapping': self._hdr_tone_mapping
        }
        
        # 滤镜集合
        self.filters = {
            'gaussian_blur': lambda img, sigma: cv2.GaussianBlur(img, (0, 0), sigma),
            'bilateral_filter': lambda img: cv2.bilateralFilter(img, 9, 75, 75),
            'median_filter': lambda img, k: cv2.medianBlur(img, k),
            'morphological_opening': self._morphological_opening,
            'morphological_closing': self._morphological_closing
        }
    
    def _load_models(self):
        """加载预训练模型"""
        try:
            # 这里可以加载预训练的超分辨率、去噪等模型
            # 示例:加载ESRGAN、RCAN等模型
            self.super_resolution_model = None  # 实际应用中加载预训练模型
            self.denoising_model = None
            self.enhancement_model = None
            
            self.logger.info("Models loaded successfully")
        except Exception as e:
            self.logger.warning(f"Failed to load some models: {e}")
    
    def intelligent_enhance(self, image_path: str, enhancement_type: str = 'auto') -> ProcessingResult:
        """智能图像增强"""
        import time
        start_time = time.time()
        
        # 加载图像
        image = cv2.imread(image_path)
        if image is None:
            raise ValueError(f"Cannot load image: {image_path}")
        
        # 分析图像质量
        quality_metrics = self._analyze_image_quality(image)
        
        # 选择最佳增强方法
        if enhancement_type == 'auto':
            enhancement_type = self._select_best_enhancement(quality_metrics)
        
        # 应用增强
        enhanced_image = self._apply_enhancement(image, enhancement_type, quality_metrics)
        
        # 评估改进效果
        quality_improvement = self._evaluate_improvement(image, enhanced_image)
        
        # 计算置信度
        confidence_score = self._calculate_confidence(quality_improvement)
        
        processing_time = time.time() - start_time
        
        return ProcessingResult(
            original_path=image_path,
            processed_image=enhanced_image,
            enhancement_type=enhancement_type,
            quality_improvement=quality_improvement,
            processing_time=processing_time,
            confidence_score=confidence_score
        )
    
    def _analyze_image_quality(self, image: np.ndarray) -> Dict[str, float]:
        """分析图像质量"""
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        
        metrics = {}
        
        # 清晰度
        laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
        metrics['sharpness'] = float(laplacian_var)
        
        # 对比度
        metrics['contrast'] = float(np.std(gray))
        
        # 亮度
        metrics['brightness'] = float(np.mean(gray))
        
        # 噪声水平
        noise_kernel = np.array([[-1, -1, -1], [-1, 8, -1], [-1, -1, -1]])
        noise = cv2.filter2D(gray, -1, noise_kernel)
        metrics['noise_level'] = float(np.std(noise))
        
        # 饱和度
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        metrics['saturation'] = float(np.mean(hsv[:, :, 1]))
        
        # 动态范围
        metrics['dynamic_range'] = float(np.max(gray) - np.min(gray))
        
        # 熵(信息量)
        hist = cv2.calcHist([gray], [0], None, [256], [0, 256])
        hist = hist / hist.sum()
        entropy = -np.sum(hist * np.log2(hist + 1e-10))
        metrics['entropy'] = float(entropy)
        
        return metrics
    
    def _select_best_enhancement(self, quality_metrics: Dict[str, float]) -> str:
        """选择最佳增强方法"""
        # 基于质量指标选择增强方法
        if quality_metrics['sharpness'] < 100:
            return 'unsharp_masking'
        elif quality_metrics['contrast'] < 30:
            return 'adaptive_histogram'
        elif quality_metrics['noise_level'] > 20:
            return 'noise_reduction'
        elif quality_metrics['brightness'] < 80:
            return 'gamma_correction'
        elif quality_metrics['dynamic_range'] < 200:
            return 'auto_contrast'
        else:
            return 'histogram_equalization'
    
    def _apply_enhancement(self, image: np.ndarray, enhancement_type: str, 
                         quality_metrics: Dict[str, float]) -> np.ndarray:
        """应用图像增强"""
        if enhancement_type in self.enhancement_methods:
            return self.enhancement_methods[enhancement_type](image, quality_metrics)
        else:
            self.logger.warning(f"Unknown enhancement type: {enhancement_type}")
            return image
    
    def _auto_contrast_enhancement(self, image: np.ndarray, metrics: Dict[str, float]) -> np.ndarray:
        """自动对比度增强"""
        # 转换到LAB颜色空间
        lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
        l, a, b = cv2.split(lab)
        
        # 应用CLAHE到L通道
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
        l = clahe.apply(l)
        
        # 合并通道
        enhanced = cv2.merge([l, a, b])
        enhanced = cv2.cvtColor(enhanced, cv2.COLOR_LAB2BGR)
        
        return enhanced
    
    def _histogram_equalization(self, image: np.ndarray, metrics: Dict[str, float]) -> np.ndarray:
        """直方图均衡化"""
        # 转换到YUV颜色空间
        yuv = cv2.cvtColor(image, cv2.COLOR_BGR2YUV)
        
        # 对Y通道进行直方图均衡化
        yuv[:, :, 0] = cv2.equalizeHist(yuv[:, :, 0])
        
        # 转换回BGR
        enhanced = cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR)
        
        return enhanced
    
    def _adaptive_histogram_equalization(self, image: np.ndarray, metrics: Dict[str, float]) -> np.ndarray:
        """自适应直方图均衡化"""
        # 转换到LAB颜色空间
        lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
        l, a, b = cv2.split(lab)
        
        # 创建CLAHE对象
        clip_limit = 3.0 if metrics['contrast'] < 20 else 2.0
        clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(8, 8))
        
        # 应用CLAHE
        l = clahe.apply(l)
        
        # 合并通道
        enhanced = cv2.merge([l, a, b])
        enhanced = cv2.cvtColor(enhanced, cv2.COLOR_LAB2BGR)
        
        return enhanced
    
    def _gamma_correction(self, image: np.ndarray, metrics: Dict[str, float]) -> np.ndarray:
        """伽马校正"""
        # 根据亮度自动调整伽马值
        brightness = metrics['brightness']
        if brightness < 80:
            gamma = 0.7  # 提亮
        elif brightness > 180:
            gamma = 1.3  # 变暗
        else:
            gamma = 1.0  # 不变
        
        # 构建查找表
        inv_gamma = 1.0 / gamma
        table = np.array([((i / 255.0) ** inv_gamma) * 255 for i in np.arange(0, 256)]).astype("uint8")
        
        # 应用伽马校正
        enhanced = cv2.LUT(image, table)
        
        return enhanced
    
    def _unsharp_masking(self, image: np.ndarray, metrics: Dict[str, float]) -> np.ndarray:
        """反锐化掩模"""
        # 创建高斯模糊
        gaussian = cv2.GaussianBlur(image, (0, 0), 2.0)
        
        # 计算反锐化掩模
        unsharp_strength = 1.5 if metrics['sharpness'] < 50 else 1.2
        enhanced = cv2.addWeighted(image, 1 + unsharp_strength, gaussian, -unsharp_strength, 0)
        
        return enhanced
    
    def _noise_reduction(self, image: np.ndarray, metrics: Dict[str, float]) -> np.ndarray:
        """噪声减少"""
        # 根据噪声水平选择滤波强度
        noise_level = metrics['noise_level']
        
        if noise_level > 30:
            # 强噪声:使用非局部均值去噪
            enhanced = cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21)
        elif noise_level > 15:
            # 中等噪声:使用双边滤波
            enhanced = cv2.bilateralFilter(image, 9, 75, 75)
        else:
            # 轻微噪声:使用高斯滤波
            enhanced = cv2.GaussianBlur(image, (3, 3), 0.5)
        
        return enhanced
    
    def _super_resolution(self, image: np.ndarray, metrics: Dict[str, float]) -> np.ndarray:
        """超分辨率增强"""
        # 简单的双三次插值超分辨率
        height, width = image.shape[:2]
        enhanced = cv2.resize(image, (width * 2, height * 2), interpolation=cv2.INTER_CUBIC)
        
        # 如果有预训练的超分辨率模型,在这里使用
        # if self.super_resolution_model:
        #     enhanced = self._apply_sr_model(enhanced)
        
        return enhanced
    
    def _hdr_tone_mapping(self, image: np.ndarray, metrics: Dict[str, float]) -> np.ndarray:
        """HDR色调映射"""
        # 转换为浮点数
        image_float = image.astype(np.float32) / 255.0
        
        # 创建Tonemap对象
        tonemap = cv2.createTonemapDrago(gamma=2.2, saturation=1.0, bias=0.85)
        
        # 应用色调映射
        enhanced = tonemap.process(image_float)
        
        # 转换回uint8
        enhanced = np.clip(enhanced * 255, 0, 255).astype(np.uint8)
        
        return enhanced
    
    def _morphological_opening(self, image: np.ndarray) -> np.ndarray:
        """形态学开运算"""
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
        return cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel)
    
    def _morphological_closing(self, image: np.ndarray) -> np.ndarray:
        """形态学闭运算"""
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
        return cv2.morphologyEx(image, cv2.MORPH_CLOSE, kernel)
    
    def _evaluate_improvement(self, original: np.ndarray, enhanced: np.ndarray) -> Dict[str, float]:
        """评估改进效果"""
        original_metrics = self._analyze_image_quality(original)
        enhanced_metrics = self._analyze_image_quality(enhanced)
        
        improvement = {}
        for key in original_metrics:
            if key in ['sharpness', 'contrast', 'saturation', 'dynamic_range', 'entropy']:
                # 这些指标越高越好
                improvement[f'{key}_improvement'] = enhanced_metrics[key] - original_metrics[key]
            elif key in ['noise_level']:
                # 噪声水平越低越好
                improvement[f'{key}_improvement'] = original_metrics[key] - enhanced_metrics[key]
            else:
                # 亮度等指标,计算相对改进
                if original_metrics[key] != 0:
                    improvement[f'{key}_improvement'] = (enhanced_metrics[key] - original_metrics[key]) / original_metrics[key]
                else:
                    improvement[f'{key}_improvement'] = 0.0
        
        return improvement
    
    def _calculate_confidence(self, improvement: Dict[str, float]) -> float:
        """计算置信度分数"""
        # 基于改进指标计算置信度
        positive_improvements = 0
        total_improvements = 0
        
        for key, value in improvement.items():
            if 'improvement' in key:
                total_improvements += 1
                if value > 0:
                    positive_improvements += 1
        
        if total_improvements == 0:
            return 0.0
        
        confidence = positive_improvements / total_improvements
        return confidence
    
    def batch_process(self, image_paths: List[str], 
                     enhancement_type: str = 'auto') -> List[ProcessingResult]:
        """批量处理图像"""
        results = []
        
        for image_path in image_paths:
            try:
                result = self.intelligent_enhance(image_path, enhancement_type)
                results.append(result)
                self.logger.info(f"Processed {image_path} successfully")
            except Exception as e:
                self.logger.error(f"Failed to process {image_path}: {e}")
        
        return results
    
    def save_result(self, result: ProcessingResult, output_path: str):
        """保存处理结果"""
        cv2.imwrite(output_path, result.processed_image)
        
        # 保存处理信息
        info_path = output_path.replace('.jpg', '_info.txt').replace('.png', '_info.txt')
        with open(info_path, 'w', encoding='utf-8') as f:
            f.write(f"Original: {result.original_path}\n")
            f.write(f"Enhancement: {result.enhancement_type}\n")
            f.write(f"Processing time: {result.processing_time:.3f}s\n")
            f.write(f"Confidence: {result.confidence_score:.3f}\n")
            f.write("\nQuality improvements:\n")
            for key, value in result.quality_improvement.items():
                f.write(f"  {key}: {value:.3f}\n")

# 使用示例
def example_intelligent_processing():
    """智能处理使用示例"""
    # 创建处理器
    processor = IntelligentImageProcessor()
    
    # 处理单张图像
    image_path = "example_image.jpg"
    result = processor.intelligent_enhance(image_path, 'auto')
    
    print(f"Intelligent processing for {image_path}:")
    print(f"Enhancement type: {result.enhancement_type}")
    print(f"Processing time: {result.processing_time:.3f}s")
    print(f"Confidence score: {result.confidence_score:.3f}")
    print(f"Quality improvements: {result.quality_improvement}")
    
    # 保存结果
    processor.save_result(result, "enhanced_image.jpg")
    
    # 批量处理
    image_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]
    batch_results = processor.batch_process(image_paths)
    
    print(f"\nBatch processing completed: {len(batch_results)} images processed")

if __name__ == "__main__":
    example_intelligent_processing()

实时分析系统

流式图像分析

# 文件路径: ai/realtime_analysis.py
import cv2
import numpy as np
import threading
import queue
import time
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
import logging
from concurrent.futures import ThreadPoolExecutor
import asyncio
from collections import deque
import json
import websockets

@dataclass
class AnalysisFrame:
    """分析帧数据"""
    frame_id: int
    timestamp: float
    image: np.ndarray
    metadata: Dict[str, Any]
    analysis_results: Dict[str, Any] = None

@dataclass
class StreamConfig:
    """流配置"""
    source: str  # 视频文件路径或摄像头索引
    fps: int = 30
    buffer_size: int = 100
    analysis_interval: int = 1  # 每隔几帧分析一次
    enable_recording: bool = False
    output_path: str = None

class RealTimeAnalyzer:
    """实时图像分析器"""
    
    def __init__(self, config: StreamConfig):
        self.config = config
        self.logger = logging.getLogger(f"{__name__}.RealTimeAnalyzer")
        
        # 初始化组件
        self._initialize_components()
        
        # 状态管理
        self.is_running = False
        self.frame_count = 0
        self.analysis_count = 0
        
        # 性能监控
        self.performance_metrics = {
            'fps': 0.0,
            'analysis_fps': 0.0,
            'processing_time': 0.0,
            'queue_size': 0
        }
    
    def _initialize_components(self):
        """初始化组件"""
        # 帧缓冲队列
        self.frame_queue = queue.Queue(maxsize=self.config.buffer_size)
        self.result_queue = queue.Queue(maxsize=self.config.buffer_size)
        
        # 分析器组件
        from .image_analysis import ImageAnalysisFramework
        from .computer_vision import ComputerVisionProcessor
        from .intelligent_processing import IntelligentImageProcessor
        
        self.ai_analyzer = ImageAnalysisFramework()
        self.cv_processor = ComputerVisionProcessor()
        self.intelligent_processor = IntelligentImageProcessor()
        
        # 回调函数列表
        self.analysis_callbacks: List[Callable] = []
        
        # 线程池
        self.executor = ThreadPoolExecutor(max_workers=4)
        
        # 性能监控
        self.fps_counter = deque(maxlen=30)
        self.analysis_fps_counter = deque(maxlen=30)
    
    def add_analysis_callback(self, callback: Callable[[AnalysisFrame], None]):
        """添加分析回调函数"""
        self.analysis_callbacks.append(callback)
    
    def start_analysis(self):
        """开始实时分析"""
        if self.is_running:
            self.logger.warning("Analysis is already running")
            return
        
        self.is_running = True
        self.logger.info("Starting real-time analysis")
        
        # 启动各个线程
        self.capture_thread = threading.Thread(target=self._capture_frames, daemon=True)
        self.analysis_thread = threading.Thread(target=self._analyze_frames, daemon=True)
        self.display_thread = threading.Thread(target=self._display_results, daemon=True)
        self.monitor_thread = threading.Thread(target=self._monitor_performance, daemon=True)
        
        self.capture_thread.start()
        self.analysis_thread.start()
        self.display_thread.start()
        self.monitor_thread.start()
        
        self.logger.info("Real-time analysis started")
    
    def stop_analysis(self):
        """停止实时分析"""
        if not self.is_running:
            return
        
        self.is_running = False
        self.logger.info("Stopping real-time analysis")
        
        # 等待线程结束
        if hasattr(self, 'capture_thread'):
            self.capture_thread.join(timeout=5)
        if hasattr(self, 'analysis_thread'):
            self.analysis_thread.join(timeout=5)
        if hasattr(self, 'display_thread'):
            self.display_thread.join(timeout=5)
        if hasattr(self, 'monitor_thread'):
            self.monitor_thread.join(timeout=5)
        
        # 关闭资源
        if hasattr(self, 'cap'):
            self.cap.release()
        
        cv2.destroyAllWindows()
        self.executor.shutdown(wait=True)
        
        self.logger.info("Real-time analysis stopped")
    
    def _capture_frames(self):
        """捕获帧线程"""
        # 初始化视频捕获
        if isinstance(self.config.source, int):
            self.cap = cv2.VideoCapture(self.config.source)
        else:
            self.cap = cv2.VideoCapture(self.config.source)
        
        if not self.cap.isOpened():
            self.logger.error(f"Failed to open video source: {self.config.source}")
            return
        
        # 设置帧率
        self.cap.set(cv2.CAP_PROP_FPS, self.config.fps)
        
        # 视频录制
        video_writer = None
        if self.config.enable_recording and self.config.output_path:
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            video_writer = cv2.VideoWriter(
                self.config.output_path, fourcc, self.config.fps, 
                (frame_width, frame_height)
            )
        
        frame_time = 1.0 / self.config.fps
        last_time = time.time()
        
        while self.is_running:
            ret, frame = self.cap.read()
            if not ret:
                self.logger.warning("Failed to read frame")
                break
            
            current_time = time.time()
            
            # 创建分析帧
            analysis_frame = AnalysisFrame(
                frame_id=self.frame_count,
                timestamp=current_time,
                image=frame.copy(),
                metadata={
                    'source': self.config.source,
                    'fps': self.config.fps,
                    'frame_size': frame.shape
                }
            )
            
            # 添加到队列
            try:
                self.frame_queue.put(analysis_frame, timeout=0.1)
            except queue.Full:
                self.logger.warning("Frame queue is full, dropping frame")
            
            # 录制视频
            if video_writer:
                video_writer.write(frame)
            
            # 更新计数器
            self.frame_count += 1
            self.fps_counter.append(current_time)
            
            # 控制帧率
            elapsed = current_time - last_time
            if elapsed < frame_time:
                time.sleep(frame_time - elapsed)
            last_time = time.time()
        
        # 清理资源
        if video_writer:
            video_writer.release()
    
    def _analyze_frames(self):
        """分析帧线程"""
        while self.is_running:
            try:
                # 获取帧
                analysis_frame = self.frame_queue.get(timeout=1.0)
                
                # 检查是否需要分析
                if analysis_frame.frame_id % self.config.analysis_interval != 0:
                    continue
                
                start_time = time.time()
                
                # 执行分析
                analysis_results = self._perform_analysis(analysis_frame)
                analysis_frame.analysis_results = analysis_results
                
                # 计算处理时间
                processing_time = time.time() - start_time
                analysis_frame.metadata['processing_time'] = processing_time
                
                # 添加到结果队列
                try:
                    self.result_queue.put(analysis_frame, timeout=0.1)
                except queue.Full:
                    self.logger.warning("Result queue is full, dropping result")
                
                # 更新计数器
                self.analysis_count += 1
                self.analysis_fps_counter.append(time.time())
                
                # 调用回调函数
                for callback in self.analysis_callbacks:
                    try:
                        callback(analysis_frame)
                    except Exception as e:
                        self.logger.error(f"Callback error: {e}")
                
            except queue.Empty:
                continue
            except Exception as e:
                self.logger.error(f"Analysis error: {e}")
    
    def _perform_analysis(self, frame: AnalysisFrame) -> Dict[str, Any]:
        """执行图像分析"""
        results = {}
        
        try:
            # AI分析
            ai_result = self.ai_analyzer.analyze_image_array(frame.image)
            results['ai_analysis'] = {
                'classifications': ai_result.classifications,
                'objects': ai_result.objects,
                'confidence': ai_result.confidence
            }
            
            # 计算机视觉分析
            cv_result = self.cv_processor.analyze_image_array(frame.image)
            results['cv_analysis'] = {
                'features': cv_result.features,
                'objects': cv_result.objects,
                'quality_metrics': cv_result.quality_metrics
            }
            
            # 智能处理建议
            quality_metrics = cv_result.quality_metrics
            enhancement_suggestions = self.intelligent_processor._generate_enhancement_suggestions(quality_metrics)
            results['enhancement_suggestions'] = enhancement_suggestions
            
        except Exception as e:
            self.logger.error(f"Analysis failed: {e}")
            results['error'] = str(e)
        
        return results
    
    def _display_results(self):
        """显示结果线程"""
        while self.is_running:
            try:
                # 获取结果
                result_frame = self.result_queue.get(timeout=1.0)
                
                # 在图像上绘制分析结果
                display_image = self._draw_analysis_results(result_frame)
                
                # 显示图像
                cv2.imshow('Real-time Analysis', display_image)
                
                # 检查退出键
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    self.stop_analysis()
                    break
                
            except queue.Empty:
                continue
            except Exception as e:
                self.logger.error(f"Display error: {e}")
    
    def _draw_analysis_results(self, frame: AnalysisFrame) -> np.ndarray:
        """在图像上绘制分析结果"""
        image = frame.image.copy()
        
        if frame.analysis_results is None:
            return image
        
        # 绘制对象检测结果
        if 'ai_analysis' in frame.analysis_results:
            ai_results = frame.analysis_results['ai_analysis']
            
            # 绘制检测框
            if 'objects' in ai_results:
                for obj in ai_results['objects']:
                    if 'bbox' in obj:
                        x, y, w, h = obj['bbox']
                        cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
                        
                        # 绘制标签
                        if 'label' in obj:
                            label = f"{obj['label']}: {obj.get('confidence', 0):.2f}"
                            cv2.putText(image, label, (x, y - 10), 
                                      cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        # 绘制性能信息
        info_text = [
            f"Frame: {frame.frame_id}",
            f"FPS: {self.performance_metrics['fps']:.1f}",
            f"Analysis FPS: {self.performance_metrics['analysis_fps']:.1f}",
            f"Processing: {frame.metadata.get('processing_time', 0):.3f}s",
            f"Queue: {self.performance_metrics['queue_size']}"
        ]
        
        for i, text in enumerate(info_text):
            cv2.putText(image, text, (10, 30 + i * 25), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        
        return image
    
    def _monitor_performance(self):
        """性能监控线程"""
        while self.is_running:
            try:
                current_time = time.time()
                
                # 计算FPS
                if len(self.fps_counter) > 1:
                    time_span = self.fps_counter[-1] - self.fps_counter[0]
                    if time_span > 0:
                        self.performance_metrics['fps'] = (len(self.fps_counter) - 1) / time_span
                
                # 计算分析FPS
                if len(self.analysis_fps_counter) > 1:
                    time_span = self.analysis_fps_counter[-1] - self.analysis_fps_counter[0]
                    if time_span > 0:
                        self.performance_metrics['analysis_fps'] = (len(self.analysis_fps_counter) - 1) / time_span
                
                # 队列大小
                self.performance_metrics['queue_size'] = self.frame_queue.qsize()
                
                # 记录性能日志
                if self.frame_count % 300 == 0:  # 每10秒记录一次
                    self.logger.info(f"Performance: {self.performance_metrics}")
                
                time.sleep(1.0)
                
            except Exception as e:
                self.logger.error(f"Performance monitoring error: {e}")
    
    def get_performance_metrics(self) -> Dict[str, float]:
        """获取性能指标"""
        return self.performance_metrics.copy()
    
    def export_analysis_data(self, output_path: str):
        """导出分析数据"""
        # 这里可以实现分析数据的导出功能
        pass

# WebSocket服务器用于实时数据传输
class AnalysisWebSocketServer:
    """分析WebSocket服务器"""
    
    def __init__(self, analyzer: RealTimeAnalyzer, port: int = 8765):
        self.analyzer = analyzer
        self.port = port
        self.clients = set()
        self.logger = logging.getLogger(f"{__name__}.AnalysisWebSocketServer")
        
        # 添加分析回调
        self.analyzer.add_analysis_callback(self._broadcast_results)
    
    async def register_client(self, websocket, path):
        """注册客户端"""
        self.clients.add(websocket)
        self.logger.info(f"Client connected: {websocket.remote_address}")
        
        try:
            await websocket.wait_closed()
        finally:
            self.clients.remove(websocket)
            self.logger.info(f"Client disconnected: {websocket.remote_address}")
    
    def _broadcast_results(self, frame: AnalysisFrame):
        """广播分析结果"""
        if not self.clients:
            return
        
        # 准备数据
        data = {
            'frame_id': frame.frame_id,
            'timestamp': frame.timestamp,
            'analysis_results': frame.analysis_results,
            'metadata': frame.metadata
        }
        
        # 异步广播
        asyncio.create_task(self._send_to_clients(json.dumps(data)))
    
    async def _send_to_clients(self, message: str):
        """发送消息给所有客户端"""
        if not self.clients:
            return
        
        # 并发发送
        await asyncio.gather(
            *[client.send(message) for client in self.clients],
            return_exceptions=True
        )
    
    def start_server(self):
        """启动WebSocket服务器"""
        return websockets.serve(self.register_client, "localhost", self.port)

# 使用示例
def example_realtime_analysis():
    """实时分析使用示例"""
    # 配置
    config = StreamConfig(
        source=0,  # 使用摄像头
        fps=30,
        buffer_size=50,
        analysis_interval=3,  # 每3帧分析一次
        enable_recording=True,
        output_path="realtime_analysis.avi"
    )
    
    # 创建分析器
    analyzer = RealTimeAnalyzer(config)
    
    # 添加自定义回调
    def custom_callback(frame: AnalysisFrame):
        if frame.analysis_results:
            print(f"Frame {frame.frame_id} analyzed in {frame.metadata.get('processing_time', 0):.3f}s")
    
    analyzer.add_analysis_callback(custom_callback)
    
    # 启动WebSocket服务器
    ws_server = AnalysisWebSocketServer(analyzer)
    
    try:
        # 启动分析
        analyzer.start_analysis()
        
        # 启动WebSocket服务器
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        start_server = ws_server.start_server()
        loop.run_until_complete(start_server)
        
        print("Real-time analysis started. Press 'q' in the video window to stop.")
        print("WebSocket server running on ws://localhost:8765")
        
        # 运行事件循环
        loop.run_forever()
        
    except KeyboardInterrupt:
        print("Stopping analysis...")
    finally:
        analyzer.stop_analysis()
        loop.close()

if __name__ == "__main__":
    example_realtime_analysis()

总结

核心知识点

  1. 深度学习框架集成

    • PyTorch/TensorFlow模型加载和推理
    • 预训练模型的使用和微调
    • 多模态分析(CLIP等)
    • 模型性能优化
  2. 计算机视觉技术

    • 传统CV算法(SIFT、ORB、HOG等)
    • 图像特征提取和分析
    • 对象检测和分割
    • 图像质量评估
  3. 智能图像处理

    • 自适应图像增强
    • 基于AI的图像修复
    • 超分辨率和去噪
    • 智能滤镜应用
  4. 实时分析系统

    • 多线程视频处理
    • 流式数据分析
    • 性能监控和优化
    • WebSocket实时通信

实用技能

  1. AI模型集成

    • 模型选择和评估
    • 推理优化技术
    • 批处理和并行计算
    • 模型版本管理
  2. 图像分析流水线

    • 端到端分析系统
    • 结果融合和决策
    • 异常检测和处理
    • 质量控制机制
  3. 性能优化

    • GPU加速计算
    • 内存管理优化
    • 并发处理策略
    • 缓存机制设计
  4. 系统集成

    • API接口设计
    • 微服务架构
    • 容器化部署
    • 监控和日志

最佳实践

  1. 模型管理

    • 使用模型版本控制
    • 实施A/B测试
    • 监控模型性能
    • 定期模型更新
  2. 数据处理

    • 数据预处理标准化
    • 增强数据多样性
    • 处理边界情况
    • 保护隐私数据
  3. 系统设计

    • 模块化架构设计
    • 错误处理机制
    • 资源管理策略
    • 扩展性考虑
  4. 质量保证

    • 自动化测试
    • 性能基准测试
    • 用户反馈收集
    • 持续改进流程

扩展思考

高级特性开发

  1. 多模态融合

    • 图像+文本分析
    • 视频+音频处理
    • 传感器数据融合
    • 跨模态检索
  2. 边缘计算

    • 模型压缩技术
    • 移动端部署
    • 离线分析能力
    • 实时推理优化
  3. 联邦学习

    • 分布式训练
    • 隐私保护学习
    • 模型聚合策略
    • 通信优化

企业级功能

  1. 大规模部署

    • 集群管理
    • 负载均衡
    • 自动扩缩容
    • 故障恢复
  2. 数据管理

    • 数据湖架构
    • 元数据管理
    • 数据血缘追踪
    • 合规性保证
  3. 业务集成

    • 工作流引擎
    • 业务规则引擎
    • 报表和仪表板
    • 用户权限管理

集成和扩展

  1. 云服务集成

    • AWS/Azure/GCP AI服务
    • 容器编排平台
    • 无服务器计算
    • 托管数据库
  2. 第三方工具

    • MLOps平台
    • 监控工具
    • 可视化工具
    • 协作平台

下一步学习指引

深入学习路径

  1. 深度学习进阶

    • Transformer架构
    • 生成对抗网络
    • 强化学习
    • 神经架构搜索
  2. 计算机视觉前沿

    • 3D视觉处理
    • 视频理解
    • 场景图生成
    • 视觉推理
  3. 系统架构

    • 微服务架构
    • 事件驱动架构
    • 流处理系统
    • 分布式系统

实践项目建议

  1. 智能相册系统

    • 自动分类和标签
    • 人脸识别和聚类
    • 场景理解
    • 智能搜索
  2. 工业质检系统

    • 缺陷检测
    • 质量评估
    • 实时监控
    • 报告生成
  3. 医疗影像分析

    • 病灶检测
    • 影像分割
    • 诊断辅助
    • 报告生成

参考资源

技术文档

开源项目

学习资源

工具和平台


通过本章节的学习,你已经掌握了AI和深度学习在图像分析中的高级应用。这些技术将帮助你构建智能化的图像处理系统,为各种实际应用场景提供强大的分析能力。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐