迁移学习:站在预训练模型的肩膀上

🎯 前言:从零开始 VS 站在巨人的肩膀上

想象一下,你要学习开车🚗。有两种方法:

方法一(从零开始):先学习发动机原理,然后自己造轮子,接着焊接车架,最后组装成汽车再学开车。这得折腾到什么时候?

方法二(站在巨人肩膀上):直接买辆现成的车,在驾校教练的指导下,基于已有的驾驶经验快速上手。几个月就能拿到驾照!

在深度学习的世界里,迁移学习就是方法二!它让我们能够利用那些在海量数据上训练好的预训练模型,快速解决自己的问题。就像站在巨人的肩膀上摘星星一样🌟

今天,我们就来学习如何当一个聪明的"站肩膀"专家,让AI模型训练从几个月缩短到几小时!

📚 目录

🧠 什么是迁移学习?

生活中的迁移学习

迁移学习在生活中无处不在:

  • 学外语:学会了英语,再学法语就容易多了(都是拉丁字母)🇫🇷
  • 学乐器:会弹钢琴的人学电子琴很快(手指技巧相通)🎹
  • 学运动:会打网球的人学羽毛球上手更快(挥拍动作相似)🏸
  • 学编程:会Python的人学Java相对容易(编程思维相通)💻

机器学习中的迁移学习

在机器学习中,迁移学习是指:

  • 把在源任务上训练好的模型
  • 迁移到目标任务
  • 利用已学到的通用特征
  • 快速适应新的问题域
# 传统方法:从零开始训练
model = create_model()
model.train(your_small_dataset)  # 可能需要几天甚至几周
accuracy = model.evaluate()      # 结果:60%准确率😢

# 迁移学习方法:站在巨人肩膀上
pretrained_model = load_pretrained_model()
model = adapt_model(pretrained_model, your_task)
model.fine_tune(your_small_dataset)  # 只需要几小时
accuracy = model.evaluate()          # 结果:90%准确率🎉

🎯 迁移学习的核心思想

1. 特征的层次性

深度神经网络学习特征是有层次的:

# 以图像识别为例
# 底层特征(通用):边缘、线条、纹理
# 中层特征(半通用):眼睛、鼻子、轮廓
# 高层特征(专用):具体的物体类别

class CNNFeatureHierarchy:
    def __init__(self):
        self.low_level = "边缘、线条、颜色"      # 🔄 可以复用
        self.mid_level = "形状、纹理、局部特征"   # 🔄 可以复用
        self.high_level = "具体物体类别"         # ❌ 需要重新训练

2. 迁移学习的基本策略

def transfer_learning_strategy():
    """
    迁移学习的基本策略
    """
    # 1. 冻结预训练模型的底层
    for layer in pretrained_model.layers[:-3]:
        layer.trainable = False  # 冻结底层特征提取器
    
    # 2. 替换顶层分类器
    pretrained_model.layers[-1] = Dense(num_classes, activation='softmax')
    
    # 3. 在新数据上微调
    pretrained_model.compile(optimizer='adam', loss='categorical_crossentropy')
    pretrained_model.fit(new_data, new_labels, epochs=10)
    
    return pretrained_model

3. 迁移学习的适用场景

# 适用场景判断表
transfer_learning_scenarios = {
    "数据量小 + 任务相似": "冻结大部分层,只训练顶层",
    "数据量小 + 任务不同": "使用预训练特征,重新训练分类器",
    "数据量大 + 任务相似": "全网络微调,使用较小学习率",
    "数据量大 + 任务不同": "使用预训练权重初始化,正常训练"
}

def choose_strategy(data_size, task_similarity):
    """选择合适的迁移学习策略"""
    if data_size == "small" and task_similarity == "high":
        return "feature_extraction"
    elif data_size == "small" and task_similarity == "low":
        return "fine_tuning_top_only"
    elif data_size == "large" and task_similarity == "high":
        return "fine_tuning_all"
    else:
        return "weight_initialization"

🏛️ 预训练模型的宝库

计算机视觉领域的明星模型

# 1. ResNet系列 - 图像分类的老大哥
import torchvision.models as models

# ResNet-50:平衡性能和速度
resnet50 = models.resnet50(pretrained=True)
print(f"ResNet-50参数量:{sum(p.numel() for p in resnet50.parameters()):,}")

# ResNet-101:更深更强
resnet101 = models.resnet101(pretrained=True)

# 2. VGG系列 - 简单粗暴的经典
vgg16 = models.vgg16(pretrained=True)
vgg19 = models.vgg19(pretrained=True)

# 3. Inception系列 - 多尺度特征提取专家
inception_v3 = models.inception_v3(pretrained=True)

# 4. MobileNet系列 - 移动端的轻量化选择
mobilenet_v2 = models.mobilenet_v2(pretrained=True)
print(f"MobileNet-V2参数量:{sum(p.numel() for p in mobilenet_v2.parameters()):,}")

# 5. EfficientNet系列 - 效率之王
# pip install efficientnet-pytorch
from efficientnet_pytorch import EfficientNet
efficientnet_b0 = EfficientNet.from_pretrained('efficientnet-b0')

自然语言处理领域的超级明星

# 1. BERT系列 - NLP的革命者
from transformers import BertModel, BertTokenizer

bert_model = BertModel.from_pretrained('bert-base-uncased')
bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# 2. GPT系列 - 生成式语言模型
from transformers import GPT2Model, GPT2Tokenizer

gpt2_model = GPT2Model.from_pretrained('gpt2')
gpt2_tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

# 3. RoBERTa - BERT的改进版
from transformers import RobertaModel, RobertaTokenizer

roberta_model = RobertaModel.from_pretrained('roberta-base')
roberta_tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# 4. 中文预训练模型
chinese_bert = BertModel.from_pretrained('bert-base-chinese')

多模态预训练模型

# CLIP - 连接图像和文本的桥梁
import clip
import torch

# 加载CLIP模型
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model, preprocess = clip.load("ViT-B/32", device=device)

# 图像和文本的联合理解
image = preprocess(your_image).unsqueeze(0).to(device)
text = clip.tokenize(["a cat", "a dog"]).to(device)

with torch.no_grad():
    image_features = clip_model.encode_image(image)
    text_features = clip_model.encode_text(text)
    
    similarity = (100.0 * image_features @ text_features.T).softmax(dim=-1)
    print(f"图像与文本的相似度: {similarity}")

🔧 迁移学习的实现策略

1. 特征提取(Feature Extraction)

import torch
import torch.nn as nn
import torchvision.models as models
from torchvision import transforms

class FeatureExtractor(nn.Module):
    def __init__(self, pretrained_model, num_classes):
        super().__init__()
        # 去掉预训练模型的最后一层
        self.features = nn.Sequential(*list(pretrained_model.children())[:-1])
        
        # 冻结特征提取器的参数
        for param in self.features.parameters():
            param.requires_grad = False
        
        # 添加新的分类器
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(pretrained_model.fc.in_features, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
    
    def forward(self, x):
        features = self.features(x)
        output = self.classifier(features)
        return output

# 使用示例
def create_feature_extractor(num_classes=10):
    # 加载预训练的ResNet-50
    resnet50 = models.resnet50(pretrained=True)
    
    # 创建特征提取器
    model = FeatureExtractor(resnet50, num_classes)
    
    print("冻结的参数数量:", sum(p.numel() for p in model.features.parameters()))
    print("可训练的参数数量:", sum(p.numel() for p in model.classifier.parameters()))
    
    return model

2. 微调(Fine-tuning)

class FineTuner(nn.Module):
    def __init__(self, pretrained_model, num_classes):
        super().__init__()
        self.backbone = pretrained_model
        
        # 替换最后的分类层
        self.backbone.fc = nn.Linear(
            self.backbone.fc.in_features, 
            num_classes
        )
    
    def forward(self, x):
        return self.backbone(x)

def fine_tune_model(num_classes, freeze_layers=None):
    # 加载预训练模型
    resnet50 = models.resnet50(pretrained=True)
    
    # 创建微调模型
    model = FineTuner(resnet50, num_classes)
    
    # 选择性冻结某些层
    if freeze_layers:
        for name, param in model.named_parameters():
            if any(layer in name for layer in freeze_layers):
                param.requires_grad = False
                print(f"冻结层: {name}")
    
    # 为不同层设置不同的学习率
    params = [
        {"params": model.backbone.layer4.parameters(), "lr": 1e-4},
        {"params": model.backbone.fc.parameters(), "lr": 1e-3}
    ]
    
    optimizer = torch.optim.Adam(params)
    
    return model, optimizer

3. 渐进式解冻(Gradual Unfreezing)

class GradualUnfreezer:
    def __init__(self, model, layer_groups):
        self.model = model
        self.layer_groups = layer_groups
        self.current_group = 0
        
        # 初始时冻结所有层
        for param in model.parameters():
            param.requires_grad = False
    
    def unfreeze_next_group(self):
        """解冻下一组层"""
        if self.current_group < len(self.layer_groups):
            for layer_name in self.layer_groups[self.current_group]:
                for name, param in self.model.named_parameters():
                    if layer_name in name:
                        param.requires_grad = True
                        print(f"解冻层: {name}")
            
            self.current_group += 1
            return True
        return False
    
    def get_trainable_params(self):
        """获取当前可训练的参数"""
        return [p for p in self.model.parameters() if p.requires_grad]

# 使用示例
def progressive_fine_tuning():
    model = models.resnet50(pretrained=True)
    
    # 定义层组(从顶层到底层)
    layer_groups = [
        ["fc"],                    # 第1组:分类器
        ["layer4"],                # 第2组:最后一个残差块
        ["layer3"],                # 第3组:倒数第二个残差块
        ["layer2", "layer1"]       # 第4组:前面的层
    ]
    
    unfreezer = GradualUnfreezer(model, layer_groups)
    
    # 训练循环
    for epoch in range(20):
        if epoch % 5 == 0:  # 每5个epoch解冻一组
            unfreezer.unfreeze_next_group()
            
            # 重新创建优化器
            optimizer = torch.optim.Adam(
                unfreezer.get_trainable_params(), 
                lr=1e-4
            )
        
        # 训练一个epoch
        train_one_epoch(model, optimizer)

💻 实战项目:打造个人图片分类器

让我们用迁移学习来创建一个能识别你的宠物类型的分类器!

1. 数据准备

import os
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.transforms as transforms

class PetDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.images = []
        self.labels = []
        
        # 扫描数据目录
        class_names = os.listdir(data_dir)
        self.class_to_idx = {name: idx for idx, name in enumerate(class_names)}
        
        for class_name in class_names:
            class_dir = os.path.join(data_dir, class_name)
            if os.path.isdir(class_dir):
                for img_name in os.listdir(class_dir):
                    if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
                        self.images.append(os.path.join(class_dir, img_name))
                        self.labels.append(self.class_to_idx[class_name])
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# 数据增强和预处理
def get_transforms():
    train_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],  # ImageNet标准
            std=[0.229, 0.224, 0.225]
        )
    ])
    
    val_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])
    
    return train_transform, val_transform

# 创建数据加载器
def create_data_loaders(data_dir, batch_size=32):
    train_transform, val_transform = get_transforms()
    
    train_dataset = PetDataset(
        os.path.join(data_dir, 'train'), 
        transform=train_transform
    )
    
    val_dataset = PetDataset(
        os.path.join(data_dir, 'val'), 
        transform=val_transform
    )
    
    train_loader = DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=True, 
        num_workers=4
    )
    
    val_loader = DataLoader(
        val_dataset, 
        batch_size=batch_size, 
        shuffle=False, 
        num_workers=4
    )
    
    return train_loader, val_loader, train_dataset.class_to_idx

2. 模型构建

import torch.nn as nn
import torchvision.models as models

class PetClassifier(nn.Module):
    def __init__(self, num_classes, pretrained=True):
        super().__init__()
        
        # 使用预训练的ResNet-50
        self.backbone = models.resnet50(pretrained=pretrained)
        
        # 冻结早期层
        for param in list(self.backbone.parameters())[:-10]:
            param.requires_grad = False
        
        # 替换分类器
        self.backbone.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(self.backbone.fc.in_features, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )
    
    def forward(self, x):
        return self.backbone(x)

# 模型训练函数
def train_model(model, train_loader, val_loader, num_epochs=10):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    # 优化器和损失函数
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
    
    best_val_acc = 0.0
    
    for epoch in range(num_epochs):
        # 训练阶段
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            
            optimizer.zero_grad()
            outputs = model(data)
            loss = criterion(outputs, target)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            train_total += target.size(0)
            train_correct += (predicted == target).sum().item()
            
            if batch_idx % 10 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], '
                      f'Batch [{batch_idx}/{len(train_loader)}], '
                      f'Loss: {loss.item():.4f}')
        
        # 验证阶段
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(device), target.to(device)
                outputs = model(data)
                loss = criterion(outputs, target)
                
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += target.size(0)
                val_correct += (predicted == target).sum().item()
        
        # 计算准确率
        train_acc = 100. * train_correct / train_total
        val_acc = 100. * val_correct / val_total
        
        print(f'Epoch [{epoch+1}/{num_epochs}]:')
        print(f'  训练损失: {train_loss/len(train_loader):.4f}, 训练准确率: {train_acc:.2f}%')
        print(f'  验证损失: {val_loss/len(val_loader):.4f}, 验证准确率: {val_acc:.2f}%')
        print('-' * 50)
        
        # 保存最佳模型
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_pet_classifier.pth')
        
        scheduler.step()
    
    return model

# 使用示例
def main():
    # 数据目录结构:
    # data/
    #   train/
    #     cats/
    #     dogs/
    #     birds/
    #   val/
    #     cats/
    #     dogs/
    #     birds/
    
    train_loader, val_loader, class_to_idx = create_data_loaders(
        'data', batch_size=32
    )
    
    num_classes = len(class_to_idx)
    model = PetClassifier(num_classes)
    
    print(f"类别映射: {class_to_idx}")
    print(f"模型参数数量: {sum(p.numel() for p in model.parameters()):,}")
    print(f"可训练参数数量: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")
    
    # 训练模型
    trained_model = train_model(model, train_loader, val_loader, num_epochs=10)
    
    return trained_model, class_to_idx

if __name__ == "__main__":
    model, class_mapping = main()

3. 模型推理和部署

def load_and_predict(model_path, image_path, class_to_idx):
    """加载模型并进行预测"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # 加载模型
    model = PetClassifier(len(class_to_idx))
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    
    # 预处理图像
    _, val_transform = get_transforms()
    image = Image.open(image_path).convert('RGB')
    image_tensor = val_transform(image).unsqueeze(0).to(device)
    
    # 预测
    with torch.no_grad():
        outputs = model(image_tensor)
        probabilities = torch.nn.functional.softmax(outputs, dim=1)
        confidence, predicted = torch.max(probabilities, 1)
    
    # 获取类别名称
    idx_to_class = {v: k for k, v in class_to_idx.items()}
    predicted_class = idx_to_class[predicted.item()]
    confidence_score = confidence.item()
    
    return predicted_class, confidence_score

# 批量预测
def batch_predict(model_path, image_dir, class_to_idx):
    """批量预测图像"""
    results = []
    
    for img_name in os.listdir(image_dir):
        if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
            img_path = os.path.join(image_dir, img_name)
            predicted_class, confidence = load_and_predict(
                model_path, img_path, class_to_idx
            )
            
            results.append({
                'image': img_name,
                'predicted_class': predicted_class,
                'confidence': confidence
            })
    
    return results

# 创建简单的Web API
from flask import Flask, request, jsonify
import base64
import io

app = Flask(__name__)

# 全局变量
MODEL = None
CLASS_TO_IDX = None

@app.route('/predict', methods=['POST'])
def predict():
    try:
        # 获取上传的图像
        file = request.files['image']
        image = Image.open(file.stream).convert('RGB')
        
        # 预处理和预测
        _, val_transform = get_transforms()
        image_tensor = val_transform(image).unsqueeze(0)
        
        with torch.no_grad():
            outputs = MODEL(image_tensor)
            probabilities = torch.nn.functional.softmax(outputs, dim=1)
            confidence, predicted = torch.max(probabilities, 1)
        
        # 返回结果
        idx_to_class = {v: k for k, v in CLASS_TO_IDX.items()}
        result = {
            'predicted_class': idx_to_class[predicted.item()],
            'confidence': confidence.item(),
            'all_probabilities': {
                idx_to_class[i]: prob.item() 
                for i, prob in enumerate(probabilities[0])
            }
        }
        
        return jsonify(result)
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

def start_api(model_path, class_to_idx):
    """启动API服务"""
    global MODEL, CLASS_TO_IDX
    
    # 加载模型
    MODEL = PetClassifier(len(class_to_idx))
    MODEL.load_state_dict(torch.load(model_path, map_location='cpu'))
    MODEL.eval()
    
    CLASS_TO_IDX = class_to_idx
    
    print("🚀 API服务启动中...")
    print("📱 使用方法:POST /predict,上传图像文件")
    app.run(host='0.0.0.0', port=5000, debug=True)

📝 文本领域的迁移学习

1. 使用BERT进行文本分类

from transformers import BertTokenizer, BertForSequenceClassification
from transformers import Trainer, TrainingArguments
import torch
from torch.utils.data import Dataset

class TextClassificationDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        
        # 分词和编码
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

class SentimentClassifier:
    def __init__(self, model_name='bert-base-uncased', num_labels=2):
        self.tokenizer = BertTokenizer.from_pretrained(model_name)
        self.model = BertForSequenceClassification.from_pretrained(
            model_name, 
            num_labels=num_labels
        )
    
    def prepare_data(self, texts, labels):
        """准备训练数据"""
        return TextClassificationDataset(
            texts, labels, self.tokenizer
        )
    
    def train(self, train_dataset, val_dataset, output_dir='./results'):
        """训练模型"""
        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=3,
            per_device_train_batch_size=16,
            per_device_eval_batch_size=16,
            warmup_steps=500,
            weight_decay=0.01,
            logging_dir='./logs',
            logging_steps=10,
            evaluation_strategy='steps',
            eval_steps=500,
            save_strategy='steps',
            save_steps=500,
            load_best_model_at_end=True,
        )
        
        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
        )
        
        trainer.train()
        return trainer
    
    def predict(self, text):
        """预测单个文本"""
        inputs = self.tokenizer(
            text,
            return_tensors='pt',
            truncation=True,
            padding=True,
            max_length=128
        )
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
            predicted_class = torch.argmax(predictions, dim=-1).item()
            confidence = predictions[0][predicted_class].item()
        
        return predicted_class, confidence

# 使用示例
def sentiment_analysis_example():
    # 示例数据
    texts = [
        "这个电影真的很棒!",
        "太无聊了,浪费时间",
        "还可以吧,不算太差",
        "强烈推荐,五星好评!",
        "完全看不下去"
    ]
    
    labels = [1, 0, 1, 1, 0]  # 1: 正面, 0: 负面
    
    # 创建分类器
    classifier = SentimentClassifier()
    
    # 准备数据
    train_dataset = classifier.prepare_data(texts, labels)
    
    # 训练(实际应用中需要更多数据)
    trainer = classifier.train(train_dataset, train_dataset)
    
    # 预测
    test_text = "这个产品质量很好"
    predicted_class, confidence = classifier.predict(test_text)
    
    sentiment = "正面" if predicted_class == 1 else "负面"
    print(f"文本:{test_text}")
    print(f"预测情感:{sentiment},置信度:{confidence:.2f}")

2. 使用预训练词向量

import gensim.downloader as api
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

class WordEmbeddingClassifier:
    def __init__(self, embedding_model='word2vec-google-news-300'):
        print(f"正在加载词向量模型: {embedding_model}")
        self.word_vectors = api.load(embedding_model)
        self.classifier = RandomForestClassifier(n_estimators=100)
    
    def text_to_vector(self, text):
        """将文本转换为向量"""
        words = text.lower().split()
        vectors = []
        
        for word in words:
            if word in self.word_vectors:
                vectors.append(self.word_vectors[word])
        
        if vectors:
            # 使用平均向量作为文本表示
            return np.mean(vectors, axis=0)
        else:
            # 如果没有找到任何词,返回零向量
            return np.zeros(self.word_vectors.vector_size)
    
    def prepare_features(self, texts):
        """准备特征向量"""
        features = []
        for text in texts:
            vector = self.text_to_vector(text)
            features.append(vector)
        return np.array(features)
    
    def train(self, texts, labels):
        """训练分类器"""
        features = self.prepare_features(texts)
        self.classifier.fit(features, labels)
    
    def predict(self, text):
        """预测单个文本"""
        vector = self.text_to_vector(text).reshape(1, -1)
        prediction = self.classifier.predict(vector)[0]
        probability = self.classifier.predict_proba(vector)[0]
        confidence = np.max(probability)
        
        return prediction, confidence

# 使用示例
def word_embedding_example():
    # 示例数据
    texts = [
        "I love this movie, it's amazing!",
        "This film is terrible, waste of time",
        "Pretty good movie, worth watching",
        "Absolutely fantastic, highly recommended!",
        "Boring and poorly made"
    ]
    
    labels = [1, 0, 1, 1, 0]  # 1: positive, 0: negative
    
    # 创建分类器
    classifier = WordEmbeddingClassifier()
    
    # 训练
    classifier.train(texts, labels)
    
    # 预测
    test_text = "This is a great product"
    prediction, confidence = classifier.predict(test_text)
    
    sentiment = "positive" if prediction == 1 else "negative"
    print(f"Text: {test_text}")
    print(f"Prediction: {sentiment}, Confidence: {confidence:.2f}")

🎨 进阶技巧:微调的艺术

1. 学习率调度策略

import torch.optim as optim
from torch.optim.lr_scheduler import *

class LearningRateScheduler:
    def __init__(self, optimizer, strategy='step'):
        self.optimizer = optimizer
        self.strategy = strategy
        self.scheduler = self._create_scheduler()
    
    def _create_scheduler(self):
        if self.strategy == 'step':
            return StepLR(self.optimizer, step_size=10, gamma=0.1)
        elif self.strategy == 'cosine':
            return CosineAnnealingLR(self.optimizer, T_max=50, eta_min=1e-6)
        elif self.strategy == 'exponential':
            return ExponentialLR(self.optimizer, gamma=0.95)
        elif self.strategy == 'plateau':
            return ReduceLROnPlateau(
                self.optimizer, mode='min', factor=0.5, 
                patience=5, verbose=True
            )
        else:
            raise ValueError(f"Unknown strategy: {self.strategy}")
    
    def step(self, metrics=None):
        if self.strategy == 'plateau':
            self.scheduler.step(metrics)
        else:
            self.scheduler.step()
    
    def get_lr(self):
        return self.scheduler.get_last_lr()[0]

# 差异化学习率
def create_discriminative_lr_optimizer(model, base_lr=1e-3):
    """为不同层设置不同的学习率"""
    
    # 分层参数
    layer_params = [
        {'params': model.backbone.layer1.parameters(), 'lr': base_lr * 0.1},
        {'params': model.backbone.layer2.parameters(), 'lr': base_lr * 0.3},
        {'params': model.backbone.layer3.parameters(), 'lr': base_lr * 0.5},
        {'params': model.backbone.layer4.parameters(), 'lr': base_lr * 0.8},
        {'params': model.backbone.fc.parameters(), 'lr': base_lr}
    ]
    
    optimizer = optim.Adam(layer_params)
    return optimizer

2. 数据增强策略

import albumentations as A
from albumentations.pytorch import ToTensorV2

class AdvancedAugmentation:
    def __init__(self, image_size=224):
        self.train_transform = A.Compose([
            A.Resize(image_size, image_size),
            A.RandomRotate90(p=0.5),
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.2),
            A.RandomBrightnessContrast(p=0.3),
            A.HueSaturationValue(p=0.3),
            A.GaussNoise(p=0.2),
            A.Blur(blur_limit=3, p=0.1),
            A.Cutout(num_holes=8, max_h_size=8, max_w_size=8, p=0.5),
            A.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            ),
            ToTensorV2()
        ])
        
        self.test_transform = A.Compose([
            A.Resize(image_size, image_size),
            A.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            ),
            ToTensorV2()
        ])
    
    def get_transforms(self):
        return self.train_transform, self.test_transform

# 混合增强(MixUp, CutMix)
class MixUpCutMix:
    def __init__(self, alpha=1.0, p=0.5):
        self.alpha = alpha
        self.p = p
    
    def mixup(self, x, y):
        """MixUp增强"""
        if np.random.rand() > self.p:
            return x, y
        
        batch_size = x.size(0)
        lam = np.random.beta(self.alpha, self.alpha)
        
        index = torch.randperm(batch_size)
        mixed_x = lam * x + (1 - lam) * x[index, :]
        
        y_a, y_b = y, y[index]
        return mixed_x, (y_a, y_b, lam)
    
    def cutmix(self, x, y):
        """CutMix增强"""
        if np.random.rand() > self.p:
            return x, y
        
        batch_size = x.size(0)
        lam = np.random.beta(self.alpha, self.alpha)
        
        index = torch.randperm(batch_size)
        
        # 随机选择区域
        W, H = x.size(2), x.size(3)
        cut_rat = np.sqrt(1. - lam)
        cut_w = np.int(W * cut_rat)
        cut_h = np.int(H * cut_rat)
        
        cx = np.random.randint(W)
        cy = np.random.randint(H)
        
        bbx1 = np.clip(cx - cut_w // 2, 0, W)
        bby1 = np.clip(cy - cut_h // 2, 0, H)
        bbx2 = np.clip(cx + cut_w // 2, 0, W)
        bby2 = np.clip(cy + cut_h // 2, 0, H)
        
        x[:, :, bbx1:bbx2, bby1:bby2] = x[index, :, bbx1:bbx2, bby1:bby2]
        
        # 调整lambda
        lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (W * H))
        
        y_a, y_b = y, y[index]
        return x, (y_a, y_b, lam)

3. 模型集成

class ModelEnsemble:
    def __init__(self, models):
        self.models = models
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # 将所有模型移到同一设备
        for model in self.models:
            model.to(self.device)
            model.eval()
    
    def predict(self, x):
        """集成预测"""
        predictions = []
        
        with torch.no_grad():
            for model in self.models:
                output = model(x)
                predictions.append(torch.softmax(output, dim=1))
        
        # 平均预测结果
        ensemble_pred = torch.mean(torch.stack(predictions), dim=0)
        return ensemble_pred
    
    def predict_with_tta(self, x, tta_transforms):
        """使用测试时增强(TTA)的预测"""
        all_predictions = []
        
        with torch.no_grad():
            # 原始预测
            original_pred = self.predict(x)
            all_predictions.append(original_pred)
            
            # TTA预测
            for transform in tta_transforms:
                augmented_x = transform(x)
                tta_pred = self.predict(augmented_x)
                all_predictions.append(tta_pred)
        
        # 平均所有预测
        final_pred = torch.mean(torch.stack(all_predictions), dim=0)
        return final_pred

# 创建集成模型
def create_ensemble():
    models = [
        PetClassifier(num_classes=3, pretrained=True),
        PetClassifier(num_classes=3, pretrained=True),
        PetClassifier(num_classes=3, pretrained=True)
    ]
    
    # 加载不同的权重
    for i, model in enumerate(models):
        model.load_state_dict(torch.load(f'model_{i}.pth'))
    
    ensemble = ModelEnsemble(models)
    return ensemble

🚨 常见陷阱与避免方法

1. 过拟合问题

class OverfittingPrevention:
    def __init__(self, model, patience=7):
        self.model = model
        self.patience = patience
        self.best_loss = float('inf')
        self.counter = 0
        self.best_model_state = None
    
    def early_stopping(self, val_loss):
        """早停机制"""
        if val_loss < self.best_loss:
            self.best_loss = val_loss
            self.counter = 0
            self.best_model_state = self.model.state_dict().copy()
            return False
        else:
            self.counter += 1
            if self.counter >= self.patience:
                print(f"Early stopping triggered after {self.counter} epochs without improvement")
                self.model.load_state_dict(self.best_model_state)
                return True
        return False
    
    def apply_regularization(self):
        """应用正则化技术"""
        regularization_techniques = {
            'dropout': self.add_dropout,
            'weight_decay': self.add_weight_decay,
            'batch_norm': self.add_batch_norm,
            'data_augmentation': self.add_data_augmentation
        }
        
        return regularization_techniques
    
    def add_dropout(self, p=0.5):
        """添加Dropout层"""
        for module in self.model.modules():
            if isinstance(module, torch.nn.Linear):
                # 在线性层前添加Dropout
                pass  # 实现细节
    
    def add_weight_decay(self, weight_decay=1e-4):
        """添加权重衰减"""
        optimizer = torch.optim.Adam(
            self.model.parameters(), 
            lr=1e-3, 
            weight_decay=weight_decay
        )
        return optimizer
    
    def monitor_training(self, train_loss, val_loss):
        """监控训练过程"""
        gap = train_loss - val_loss
        if gap > 0.1:  # 训练损失显著低于验证损失
            print("⚠️  警告:可能存在过拟合")
            print(f"训练损失: {train_loss:.4f}, 验证损失: {val_loss:.4f}")
            print("建议:减少模型复杂度或增加正则化")

2. 数据泄露问题

class DataLeakagePrevention:
    def __init__(self):
        self.seen_samples = set()
    
    def check_data_leakage(self, train_data, val_data, test_data):
        """检查数据泄露"""
        print("🔍 检查数据泄露...")
        
        # 转换为可哈希的形式
        train_hashes = {hash(str(sample)) for sample in train_data}
        val_hashes = {hash(str(sample)) for sample in val_data}
        test_hashes = {hash(str(sample)) for sample in test_data}
        
        # 检查重叠
        train_val_overlap = train_hashes.intersection(val_hashes)
        train_test_overlap = train_hashes.intersection(test_hashes)
        val_test_overlap = val_hashes.intersection(test_hashes)
        
        if train_val_overlap:
            print(f"❌ 发现训练集和验证集重叠:{len(train_val_overlap)} 个样本")
        
        if train_test_overlap:
            print(f"❌ 发现训练集和测试集重叠:{len(train_test_overlap)} 个样本")
        
        if val_test_overlap:
            print(f"❌ 发现验证集和测试集重叠:{len(val_test_overlap)} 个样本")
        
        if not (train_val_overlap or train_test_overlap or val_test_overlap):
            print("✅ 没有发现数据泄露")
    
    def temporal_split(self, data, time_column, split_date):
        """时间序列数据的正确分割"""
        train_data = data[data[time_column] < split_date]
        test_data = data[data[time_column] >= split_date]
        
        return train_data, test_data
    
    def stratified_split(self, data, target_column, test_size=0.2):
        """分层抽样分割"""
        from sklearn.model_selection import train_test_split
        
        train_data, test_data = train_test_split(
            data, 
            test_size=test_size, 
            stratify=data[target_column],
            random_state=42
        )
        
        return train_data, test_data

3. 预训练模型兼容性问题

class ModelCompatibilityChecker:
    def __init__(self):
        self.compatibility_matrix = {
            'image_size': {
                'resnet': 224,
                'inception': 299,
                'efficientnet': 224,
                'vgg': 224
            },
            'normalization': {
                'imagenet': {'mean': [0.485, 0.456, 0.406], 'std': [0.229, 0.224, 0.225]},
                'custom': {'mean': [0.5, 0.5, 0.5], 'std': [0.5, 0.5, 0.5]}
            }
        }
    
    def check_input_compatibility(self, model_name, input_size):
        """检查输入尺寸兼容性"""
        expected_size = self.compatibility_matrix['image_size'].get(model_name, 224)
        
        if input_size != expected_size:
            print(f"⚠️  输入尺寸不匹配:期望 {expected_size},实际 {input_size}")
            return False
        
        return True
    
    def get_recommended_preprocessing(self, model_name):
        """获取推荐的预处理方式"""
        if model_name in ['resnet', 'vgg', 'efficientnet']:
            return self.compatibility_matrix['normalization']['imagenet']
        else:
            return self.compatibility_matrix['normalization']['custom']
    
    def version_compatibility_check(self, model_name, framework_version):
        """检查框架版本兼容性"""
        compatibility_info = {
            'torch': {
                'min_version': '1.7.0',
                'recommended': '1.12.0'
            },
            'transformers': {
                'min_version': '4.0.0',
                'recommended': '4.20.0'
            }
        }
        
        # 实际的版本检查逻辑
        print(f"✅ 模型 {model_name} 与当前框架版本兼容")

🎬 下集预告

恭喜你!🎉 你已经完成了深度学习篇的所有内容,从基础的神经网络到高级的迁移学习,你已经掌握了深度学习的核心技术。现在你可以:

  • 理解并实现各种神经网络架构
  • 掌握卷积神经网络和循环神经网络
  • 运用自编码器和生成对抗网络
  • 熟练使用迁移学习快速解决实际问题

下一篇文章将开启全新的篇章——《计算机视觉篇》!我们将探索:

🔍 OpenCV入门:计算机视觉的瑞士军刀

  • 图像处理的基本操作
  • 计算机视觉的工具箱
  • 从像素到智能的转换

在计算机视觉的世界里,我们将学会让计算机"看懂"世界,就像给AI装上了一双慧眼!准备好进入这个充满挑战和惊喜的视觉世界了吗?

📝 总结与思考题

🌟 本文关键知识点

  1. 迁移学习概念:利用预训练模型快速解决新任务
  2. 核心策略:特征提取、微调、渐进式解冻
  3. 预训练模型库:计算机视觉和NLP领域的明星模型
  4. 实战技巧:数据增强、学习率调度、模型集成
  5. 常见陷阱:过拟合、数据泄露、兼容性问题

🤔 思考题

  1. 什么情况下应该使用特征提取而不是微调?
  2. 如何选择合适的预训练模型?
  3. 迁移学习在小数据集上的优势是什么?
  4. 如何评估迁移学习的效果?
  5. 跨领域迁移学习有哪些挑战?

📋 实践作业

  1. 基础练习:使用预训练ResNet实现花朵分类
  2. 进阶练习:比较不同迁移学习策略的效果
  3. 挑战练习:实现一个通用的迁移学习框架

🎯 深度学习篇总结

通过这8篇文章,你已经:

  • 🧠 深度学习入门:理解了深度学习的基本概念
  • 🔧 TensorFlow/PyTorch:掌握了深度学习框架的使用
  • 🕸️ 神经网络:从感知机到多层网络的演进
  • 👁️ 卷积神经网络:让AI学会看图的技术
  • 🔄 循环神经网络:让AI理解序列数据
  • 🎨 自编码器:AI的压缩与重建艺术
  • 🎭 生成对抗网络:AI的创作能力
  • 🏗️ 迁移学习:站在巨人的肩膀上

现在的你已经具备了深度学习的扎实基础,可以自信地说:"我懂深度学习!"接下来,让我们在计算机视觉的海洋中继续探索吧!🌊


💡 深度学习小贴士:迁移学习是深度学习实用化的关键技术。在实际项目中,99%的情况下我们都应该优先考虑迁移学习,除非你有谷歌那样的数据和计算资源!

🎯 下次预告:准备好让AI拥有"火眼金睛"了吗?计算机视觉的精彩世界正在等你!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐