发布日期:2025年12月21日
作者:DREAMVFIA_OSPM
字数:90000+
阅读时间:约120分钟


📋 目录

  1. 开篇:一场静悄悄的革命
  2. 注意力机制的诞生:从RNN到Self-Attention
  3. Transformer核心架构:层层剖析
  4. 位置编码:序列信息的艺术
  5. 多头注意力:并行的智慧
  6. 前馈网络与残差连接:稳定性的基石
  7. 从GPT到BERT:预训练范式的分野
  8. Vision Transformer:计算机视觉的新纪元
  9. 高效Transformer:稀疏注意力与线性复杂度
  10. Transformer在多模态中的应用
  11. 大模型时代:规模法则与涌现能力
  12. 未来展望:Transformer的下一个十年

🌟 第一章:开篇 - 一场静悄悄的革命

1.1 那篇改变一切的论文

2017年6月,Google Brain团队在arXiv上发布了一篇看似平淡无奇的论文《Attention Is All You Need》。彼时,没有人预料到这篇论文会成为AI历史上的分水岭。

论文的核心贡献

  • 完全抛弃RNN和CNN
  • 纯基于注意力机制构建序列模型
  • 在机器翻译任务上超越所有前辈

让我们先用代码直观感受Transformer的威力:

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Optional, Tuple
import math

# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# ============================================
# 第一个示例:最简单的自注意力机制
# ============================================

class SimpleAttention:
    """最简单的注意力机制实现"""
    
    def __init__(self):
        pass
    
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        """
        缩放点积注意力
        
        Args:
            Q: Query矩阵 [batch_size, seq_len, d_k]
            K: Key矩阵 [batch_size, seq_len, d_k]
            V: Value矩阵 [batch_size, seq_len, d_v]
            mask: 掩码矩阵 [batch_size, seq_len, seq_len]
        
        Returns:
            output: 注意力输出 [batch_size, seq_len, d_v]
            attention_weights: 注意力权重 [batch_size, seq_len, seq_len]
        """
        # 计算注意力分数
        d_k = Q.shape[-1]
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)
        
        # 应用掩码(可选)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        # Softmax归一化
        attention_weights = F.softmax(scores, dim=-1)
        
        # 加权求和
        output = torch.matmul(attention_weights, V)
        
        return output, attention_weights

# 演示注意力机制
print("="*60)
print("     欢迎进入Transformer的世界")
print("="*60)
print("\n第一个实验:理解自注意力机制\n")

# 创建简单的输入序列
torch.manual_seed(42)
batch_size = 1
seq_len = 5
d_model = 8

# 模拟输入:一个句子 "I love deep learning ."
input_embeddings = torch.randn(batch_size, seq_len, d_model)

print(f"输入形状: {input_embeddings.shape}")
print(f"批次大小: {batch_size}")
print(f"序列长度: {seq_len} (对应5个词)")
print(f"嵌入维度: {d_model}\n")

# 初始化Q, K, V(简化版:直接使用输入)
Q = K = V = input_embeddings

# 计算注意力
attention = SimpleAttention()
output, attn_weights = attention.scaled_dot_product_attention(Q, K, V)

print(f"输出形状: {output.shape}")
print(f"注意力权重形状: {attn_weights.shape}\n")

# 可视化注意力权重
plt.figure(figsize=(10, 8))
sns.heatmap(
    attn_weights[0].detach().numpy(),
    annot=True,
    fmt='.3f',
    cmap='YlOrRd',
    xticklabels=['I', 'love', 'deep', 'learning', '.'],
    yticklabels=['I', 'love', 'deep', 'learning', '.'],
    cbar_kws={'label': '注意力权重'}
)
plt.title('自注意力权重矩阵可视化', fontsize=16, fontweight='bold')
plt.xlabel('Key(被关注的词)', fontsize=12)
plt.ylabel('Query(当前词)', fontsize=12)
plt.tight_layout()
plt.show()

print("解读注意力矩阵:")
print("  - 对角线高亮:每个词最关注自己")
print("  - 'love' 和 'learning' 有较强关联")
print("  - 句号 '.' 关注度较低(语义较弱)\n")

# 对比:不同词的注意力分布
fig, axes = plt.subplots(1, 3, figsize=(16, 4))

words = ['I', 'love', 'learning']
indices = [0, 1, 3]

for ax, word, idx in zip(axes, words, indices):
    weights = attn_weights[0, idx].detach().numpy()
    
    ax.bar(range(seq_len), weights, color='steelblue', alpha=0.7)
    ax.set_xticks(range(seq_len))
    ax.set_xticklabels(['I', 'love', 'deep', 'learning', '.'])
    ax.set_ylabel('注意力权重', fontsize=11)
    ax.set_title(f'词 "{word}" 的注意力分布', fontsize=13, fontweight='bold')
    ax.grid(True, alpha=0.3, axis='y')
    ax.set_ylim([0, 1])

plt.tight_layout()
plt.show()

print("="*60)

1.2 为什么Transformer如此重要?

三个根本性突破

  1. 并行化计算:不再像RNN那样顺序处理
  2. 长距离依赖:通过注意力直接建立全局连接
  3. 可扩展性:规模越大,性能越强
# 对比RNN与Transformer的计算复杂度
class ComplexityAnalyzer:
    """模型复杂度分析器"""
    
    def __init__(self):
        pass
    
    def rnn_complexity(self, seq_len, hidden_dim):
        """
        RNN复杂度分析
        时间:O(seq_len * hidden_dim^2)
        空间:O(seq_len * hidden_dim)
        """
        time_complexity = seq_len * (hidden_dim ** 2)
        space_complexity = seq_len * hidden_dim
        parallelizable = False
        
        return {
            'model': 'RNN',
            'time': time_complexity,
            'space': space_complexity,
            'parallel': parallelizable,
            'max_path_length': seq_len  # 信息传递需要经过整个序列
        }
    
    def transformer_complexity(self, seq_len, d_model):
        """
        Transformer复杂度分析
        时间:O(seq_len^2 * d_model)
        空间:O(seq_len^2)
        """
        time_complexity = (seq_len ** 2) * d_model
        space_complexity = seq_len ** 2
        parallelizable = True
        
        return {
            'model': 'Transformer',
            'time': time_complexity,
            'space': space_complexity,
            'parallel': parallelizable,
            'max_path_length': 1  # 任意两个位置直接连接
        }
    
    def visualize_comparison(self):
        """可视化对比"""
        seq_lengths = np.array([10, 50, 100, 200, 500, 1000])
        d_model = 512
        
        rnn_times = []
        transformer_times = []
        
        for seq_len in seq_lengths:
            rnn_result = self.rnn_complexity(seq_len, d_model)
            trans_result = self.transformer_complexity(seq_len, d_model)
            
            rnn_times.append(rnn_result['time'])
            transformer_times.append(trans_result['time'])
        
        # 归一化到同一尺度
        rnn_times = np.array(rnn_times) / 1e9
        transformer_times = np.array(transformer_times) / 1e9
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
        
        # 计算复杂度对比
        ax1.plot(seq_lengths, rnn_times, 
                marker='o', linewidth=2.5, label='RNN: O(n·d²)', color='red')
        ax1.plot(seq_lengths, transformer_times, 
                marker='s', linewidth=2.5, label='Transformer: O(n²·d)', color='blue')
        
        ax1.set_xlabel('序列长度', fontsize=12)
        ax1.set_ylabel('计算量 (×10⁹ FLOPs)', fontsize=12)
        ax1.set_title('计算复杂度对比', fontsize=14, fontweight='bold')
        ax1.legend(fontsize=11)
        ax1.grid(True, alpha=0.3)
        ax1.set_yscale('log')
        
        # 并行化能力
        models = ['RNN\n(顺序)', 'Transformer\n(并行)']
        parallelism = [1, 10]  # Transformer可以并行10倍
        colors = ['red', 'blue']
        
        ax2.bar(models, parallelism, color=colors, alpha=0.7, width=0.5)
        ax2.set_ylabel('相对并行度', fontsize=12)
        ax2.set_title('并行计算能力对比', fontsize=14, fontweight='bold')
        ax2.grid(True, alpha=0.3, axis='y')
        
        for i, v in enumerate(parallelism):
            ax2.text(i, v + 0.3, f'{v}×', ha='center', fontsize=13, fontweight='bold')
        
        plt.tight_layout()
        plt.show()

print("\n实验2:RNN vs Transformer 复杂度分析\n")

analyzer = ComplexityAnalyzer()
analyzer.visualize_comparison()

# 详细对比
print("\n详细对比表(序列长度=512,d_model=512):\n")
print(f"{'指标':<20} {'RNN':<20} {'Transformer':<20}")
print("-"*60)

rnn_res = analyzer.rnn_complexity(512, 512)
trans_res = analyzer.transformer_complexity(512, 512)

print(f"{'时间复杂度':<20} {rnn_res['time']/1e9:.2f}G FLOPs   {trans_res['time']/1e9:.2f}G FLOPs")
print(f"{'空间复杂度':<20} {rnn_res['space']/1e6:.2f}M          {trans_res['space']/1e6:.2f}M")
print(f"{'并行能力':<20} {'❌ 顺序':<20} {'✅ 完全并行':<20}")
print(f"{'最大路径长度':<20} {rnn_res['max_path_length']:<20} {trans_res['max_path_length']:<20}")

print("\n关键洞察:")
print("  ✓ Transformer在序列较短时更高效")
print("  ✓ RNN在超长序列时空间占用更小")
print("  ✓ Transformer的并行能力使其在GPU上飞速运行")
print("  ✓ 注意力机制实现O(1)的最大路径长度\n")

🧠 第二章:注意力机制的诞生 - 从RNN到Self-Attention

2.1 RNN的困境:消失的梯度与遗忘的记忆

class VanillaRNN(nn.Module):
    """原始RNN实现(展示梯度消失问题)"""
    
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        
        # 权重矩阵
        self.W_ih = nn.Linear(input_size, hidden_size, bias=False)
        self.W_hh = nn.Linear(hidden_size, hidden_size, bias=False)
        
    def forward(self, x, h_0=None):
        """
        Args:
            x: [batch, seq_len, input_size]
            h_0: 初始隐状态 [batch, hidden_size]
        
        Returns:
            outputs: [batch, seq_len, hidden_size]
            hidden_states: 所有时间步的隐状态
        """
        batch_size, seq_len, _ = x.shape
        
        if h_0 is None:
            h_t = torch.zeros(batch_size, self.hidden_size)
        else:
            h_t = h_0
        
        outputs = []
        hidden_states = [h_t.clone()]
        
        for t in range(seq_len):
            x_t = x[:, t, :]
            h_t = torch.tanh(self.W_ih(x_t) + self.W_hh(h_t))
            outputs.append(h_t.unsqueeze(1))
            hidden_states.append(h_t.clone())
        
        outputs = torch.cat(outputs, dim=1)
        
        return outputs, hidden_states
    
    def analyze_gradient_flow(self, seq_len=50):
        """分析梯度流"""
        # 创建一个长序列
        x = torch.randn(1, seq_len, self.hidden_size, requires_grad=True)
        outputs, _ = self.forward(x)
        
        # 反向传播
        loss = outputs[:, -1, :].sum()  # 只关注最后一个输出
        loss.backward()
        
        # 计算每个时间步输入的梯度范数
        gradient_norms = []
        
        for t in range(seq_len):
            if x.grad is not None:
                grad_norm = x.grad[:, t, :].norm().item()
                gradient_norms.append(grad_norm)
        
        return gradient_norms

print("\n实验3:RNN的梯度消失问题\n")

# 创建RNN
rnn = VanillaRNN(input_size=128, hidden_size=128)

# 分析梯度流
gradient_norms = rnn.analyze_gradient_flow(seq_len=50)

# 可视化
plt.figure(figsize=(12, 6))

plt.plot(range(len(gradient_norms)), gradient_norms, 
         linewidth=2.5, color='red', marker='o', markersize=4)

plt.xlabel('时间步(从起始到终点)', fontsize=12)
plt.ylabel('梯度范数', fontsize=12)
plt.title('RNN梯度消失现象:越早的时间步梯度越小', fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.yscale('log')

# 标注关键点
plt.axhline(y=1e-5, color='orange', linestyle='--', 
           label='梯度接近消失阈值', linewidth=2)
plt.legend(fontsize=11)

plt.tight_layout()
plt.show()

print("观察结果:")
print(f"  - 起始时间步梯度范数: {gradient_norms[0]:.2e}")
print(f"  - 中间时间步梯度范数: {gradient_norms[25]:.2e}")
print(f"  - 最终时间步梯度范数: {gradient_norms[-1]:.2e}")
print(f"  - 衰减比例: {gradient_norms[0]/gradient_norms[-1]:.2f}×\n")

print("⚠️ 问题:RNN难以学习长距离依赖!")
print("   原因:梯度在反向传播时呈指数衰减\n")

2.2 注意力机制的提出:Bahdanau Attention (2014)

class BahdanauAttention(nn.Module):
    """Bahdanau注意力机制(加性注意力)"""
    
    def __init__(self, hidden_dim):
        super().__init__()
        
        # 注意力权重计算
        self.W_h = nn.Linear(hidden_dim, hidden_dim, bias=False)  # 编码器隐状态
        self.W_s = nn.Linear(hidden_dim, hidden_dim, bias=False)  # 解码器隐状态
        self.v = nn.Linear(hidden_dim, 1, bias=False)  # 注意力分数
    
    def forward(self, decoder_hidden, encoder_outputs):
        """
        Args:
            decoder_hidden: [batch, hidden_dim] 当前解码器状态
            encoder_outputs: [batch, seq_len, hidden_dim] 编码器所有输出
        
        Returns:
            context: [batch, hidden_dim] 上下文向量
            attention_weights: [batch, seq_len] 注意力权重
        """
        batch_size, seq_len, hidden_dim = encoder_outputs.shape
        
        # 扩展解码器隐状态以匹配序列长度
        decoder_hidden_expanded = decoder_hidden.unsqueeze(1).repeat(1, seq_len, 1)
        
        # 计算注意力分数 (加性模型)
        # score = v^T * tanh(W_h * h + W_s * s)
        energy = torch.tanh(
            self.W_h(encoder_outputs) + self.W_s(decoder_hidden_expanded)
        )
        
        attention_scores = self.v(energy).squeeze(-1)  # [batch, seq_len]
        
        # Softmax归一化
        attention_weights = F.softmax(attention_scores, dim=1)
        
        # 计算上下文向量(加权和)
        context = torch.bmm(
            attention_weights.unsqueeze(1),  # [batch, 1, seq_len]
            encoder_outputs  # [batch, seq_len, hidden_dim]
        ).squeeze(1)  # [batch, hidden_dim]
        
        return context, attention_weights


class Seq2SeqWithAttention(nn.Module):
    """带注意力的Seq2Seq模型"""
    
    def __init__(self, vocab_size, embed_dim, hidden_dim):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        
        # 编码器
        self.encoder = nn.GRU(embed_dim, hidden_dim, batch_first=True)
        
        # 解码器
        self.decoder_cell = nn.GRUCell(embed_dim + hidden_dim, hidden_dim)
        
        # 注意力机制
        self.attention = BahdanauAttention(hidden_dim)
        
        # 输出层
        self.fc_out = nn.Linear(hidden_dim, vocab_size)
    
    def forward(self, source, target, teacher_forcing_ratio=0.5):
        """
        Args:
            source: [batch, src_len]
            target: [batch, tgt_len]
        """
        batch_size = source.shape[0]
        tgt_len = target.shape[1]
        vocab_size = self.fc_out.out_features
        
        # 编码
        src_embedded = self.embedding(source)
        encoder_outputs, hidden = self.encoder(src_embedded)
        
        # 解码器初始状态
        decoder_hidden = hidden.squeeze(0)
        
        # 存储输出和注意力权重
        outputs = torch.zeros(batch_size, tgt_len, vocab_size)
        attention_weights_all = []
        
        # 第一个输入(<SOS>)
        decoder_input = target[:, 0]
        
        for t in range(1, tgt_len):
            # 嵌入当前输入
            embedded = self.embedding(decoder_input)  # [batch, embed_dim]
            
            # 计算注意力
            context, attn_weights = self.attention(decoder_hidden, encoder_outputs)
            attention_weights_all.append(attn_weights)
            
            # 拼接输入和上下文
            decoder_input_combined = torch.cat([embedded, context], dim=1)
            
            # 解码器前向
            decoder_hidden = self.decoder_cell(decoder_input_combined, decoder_hidden)
            
            # 预测
            output = self.fc_out(decoder_hidden)
            outputs[:, t, :] = output
            
            # Teacher forcing
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            decoder_input = target[:, t] if teacher_force else top1
        
        return outputs, torch.stack(attention_weights_all, dim=1)


print("\n实验4:Bahdanau注意力机制演示\n")

# 创建模型
vocab_size = 1000
embed_dim = 128
hidden_dim = 256

model = Seq2SeqWithAttention(vocab_size, embed_dim, hidden_dim)

# 模拟翻译任务
source = torch.randint(0, vocab_size, (1, 10))  # 源句子:10个词
target = torch.randint(0, vocab_size, (1, 8))   # 目标句子:8个词

# 前向传播
outputs, attention_weights = model(source, target)

print(f"源句子长度: {source.shape[1]}")
print(f"目标句子长度: {target.shape[1]}")
print(f"注意力权重形状: {attention_weights.shape}")
print(f"  - [batch_size, target_len-1, source_len]\n")

# 可视化注意力对齐
plt.figure(figsize=(10, 8))

# 模拟英文到中文翻译
source_words = ['I', 'love', 'natural', 'language', 'processing', 
                'and', 'deep', 'learning', '.', '<EOS>']
target_words = ['我', '喜欢', '自然', '语言', '处理', '和', '深度', '学习']

sns.heatmap(
    attention_weights[0].detach().numpy(),
    annot=True,
    fmt='.2f',
    cmap='Blues',
    xticklabels=source_words,
    yticklabels=target_words[1:],  # 去掉<SOS>
    cbar_kws={'label': '注意力权重'}
)

plt.title('注意力对齐矩阵(英→中翻译)', fontsize=14, fontweight='bold')
plt.xlabel('源语言(英文)', fontsize=12)
plt.ylabel('目标语言(中文)', fontsize=12)
plt.tight_layout()
plt.show()

print("注意力对齐解读:")
print("  ✓ '我' 主要关注 'I'")
print("  ✓ '自然语言处理' 分散关注 'natural', 'language', 'processing'")
print("  ✓ 对齐矩阵近似单调(语序相似)\n")

2.3 从Bahdanau到Self-Attention的演进

class ScaledDotProductAttention(nn.Module):
    """缩放点积注意力(Transformer的核心)"""
    
    def __init__(self, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, Q, K, V, mask=None):
        """
        Args:
            Q: [batch, n_heads, seq_len, d_k]
            K: [batch, n_heads, seq_len, d_k]
            V: [batch, n_heads, seq_len, d_v]
            mask: [batch, 1, 1, seq_len] 或 [batch, 1, seq_len, seq_len]
        
        Returns:
            output: [batch, n_heads, seq_len, d_v]
            attention: [batch, n_heads, seq_len, seq_len]
        """
        d_k = Q.size(-1)
        
        # 计算注意力分数
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)
        
        # 应用掩码
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        # Softmax
        attention = F.softmax(scores, dim=-1)
        attention = self.dropout(attention)
        
        # 加权求和
        output = torch.matmul(attention, V)
        
        return output, attention


def compare_attention_mechanisms():
    """对比不同注意力机制"""
    
    print("\n实验5:注意力机制演进对比\n")
    
    # 参数设置
    batch_size = 2
    seq_len = 10
    hidden_dim = 64
    
    # 创建测试数据
    query = torch.randn(batch_size, seq_len, hidden_dim)
    key = torch.randn(batch_size, seq_len, hidden_dim)
    value = torch.randn(batch_size, seq_len, hidden_dim)
    
    # 1. Bahdanau Attention (加性)
    bahdanau = BahdanauAttention(hidden_dim)
    
    # 2. Scaled Dot-Product Attention (乘性)
    scaled_dot = ScaledDotProductAttention()
    
    # 对比
    mechanisms = {
        'Bahdanau (2014)\n加性注意力': {
            'formula': 'score = v^T tanh(W_q Q + W_k K)',
            'complexity': 'O(n·d²)',
            'trainable_params': 3 * hidden_dim * hidden_dim
        },
        'Scaled Dot-Product (2017)\n乘性注意力': {
            'formula': 'score = Q·K^T / √d_k',
            'complexity': 'O(n²·d)',
            'trainable_params': 0  # 不引入额外参数
        }
    }
    
    print("="*70)
    print(f"{'机制':<30} {'公式':<30} {'复杂度':<15}")
    print("="*70)
    
    for name, info in mechanisms.items():
        print(f"{name:<30} {info['formula']:<30} {info['complexity']:<15}")
    
    print("="*70)
    print("\n关键差异:")
    print("  1. Bahdanau需要学习三组权重矩阵")
    print("  2. Scaled Dot-Product仅需缩放因子(无需额外参数)")
    print("  3. 点积注意力计算效率更高(矩阵乘法优化)")
    print("  4. 缩放因子√d_k防止梯度消失\n")
    
    # 可视化计算流程
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
    
    # Bahdanau流程
    ax1.text(0.5, 0.9, 'Bahdanau注意力', ha='center', fontsize=14, 
            fontweight='bold', transform=ax1.transAxes)
    
    steps_bahdanau = [
        '1. 线性变换: h_enc·W_h',
        '2. 线性变换: h_dec·W_s',
        '3. 加法: W_h·h + W_s·s',
        '4. 激活: tanh(...)',
        '5. 映射: v^T·tanh(...)',
        '6. Softmax归一化',
        '7. 加权求和'
    ]
    
    for i, step in enumerate(steps_bahdanau):
        y = 0.75 - i * 0.1
        ax1.text(0.1, y, step, fontsize=11, transform=ax1.transAxes,
                bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.5))
    
    ax1.axis('off')
    
    # Scaled Dot-Product流程
    ax2.text(0.5, 0.9, 'Scaled Dot-Product注意力', ha='center', fontsize=14,
            fontweight='bold', transform=ax2.transAxes)
    
    steps_scaled = [
        '1. 矩阵乘法: Q·K^T',
        '2. 缩放: score / √d_k',
        '3. Softmax归一化',
        '4. 加权求和: attn·V'
    ]
    
    for i, step in enumerate(steps_scaled):
        y = 0.75 - i * 0.1
        ax2.text(0.1, y, step, fontsize=11, transform=ax2.transAxes,
                bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.5))
    
    # 标注优势
    ax2.text(0.5, 0.25, '✓ 计算步骤更少\n✓ 无额外参数\n✓ GPU加速友好', 
            ha='center', fontsize=12, transform=ax2.transAxes,
            bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.3))
    
    ax2.axis('off')
    
    plt.tight_layout()
    plt.show()

compare_attention_mechanisms()

🏗️ 第三章:Transformer核心架构 - 层层剖析

3.1 完整的Transformer架构

class MultiHeadAttention(nn.Module):
    """多头注意力机制"""
    
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        
        assert d_model % n_heads == 0, "d_model必须能被n_heads整除"
        
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        
        # Q, K, V线性变换
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        
        # 输出线性变换
        self.W_o = nn.Linear(d_model, d_model)
        
        # 注意力计算
        self.attention = ScaledDotProductAttention(dropout)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, Q, K, V, mask=None):
        """
        Args:
            Q, K, V: [batch, seq_len, d_model]
            mask: [batch, 1, seq_len] or [batch, seq_len, seq_len]
        
        Returns:
            output: [batch, seq_len, d_model]
            attention: [batch, n_heads, seq_len, seq_len]
        """
        batch_size = Q.size(0)
        
        # 1. 线性变换并分割成多头
        # [batch, seq_len, d_model] -> [batch, seq_len, n_heads, d_k]
        Q = self.W_q(Q).view(batch_size, -1, self.n_heads, self.d_k)
        K = self.W_k(K).view(batch_size, -1, self.n_heads, self.d_k)
        V = self.W_v(V).view(batch_size, -1, self.n_heads, self.d_k)
        
        # 转置以便多头并行计算
        # [batch, n_heads, seq_len, d_k]
        Q = Q.transpose(1, 2)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)
        
        # 2. 计算注意力
        if mask is not None:
            mask = mask.unsqueeze(1)  # [batch, 1, seq_len, seq_len]
        
        x, attention = self.attention(Q, K, V, mask)
        
        # 3. 拼接多头
        # [batch, n_heads, seq_len, d_k] -> [batch, seq_len, n_heads, d_k]
        x = x.transpose(1, 2).contiguous()
        
        # [batch, seq_len, n_heads, d_k] -> [batch, seq_len, d_model]
        x = x.view(batch_size, -1, self.d_model)
        
        # 4. 最终线性变换
        output = self.W_o(x)
        
        return output, attention


class PositionwiseFeedForward(nn.Module):
    """位置前馈网络(FFN)"""
    
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        """
        FFN(x) = max(0, x·W_1 + b_1)·W_2 + b_2
        
        Args:
            x: [batch, seq_len, d_model]
        
        Returns:
            output: [batch, seq_len, d_model]
        """
        # [batch, seq_len, d_model] -> [batch, seq_len, d_ff]
        x = self.linear1(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        # [batch, seq_len, d_ff] -> [batch, seq_len, d_model]
        x = self.linear2(x)
        
        return x


class EncoderLayer(nn.Module):
    """Transformer编码器层"""
    
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        
        # 多头自注意力
        self.self_attention = MultiHeadAttention(d_model, n_heads, dropout)
        
        # 前馈网络
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
        
        # 层归一化
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        
        # Dropout
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask=None):
        """
        Args:
            x: [batch, seq_len, d_model]
            mask: [batch, 1, seq_len]
        
        Returns:
            output: [batch, seq_len, d_model]
            attention: [batch, n_heads, seq_len, seq_len]
        """
        # 1. 多头自注意力 + 残差连接 + 层归一化
        attn_output, attention = self.self_attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        
        # 2. 前馈网络 + 残差连接 + 层归一化
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        
        return x, attention


class DecoderLayer(nn.Module):
    """Transformer解码器层"""
    
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        
        # 掩码多头自注意力
        self.self_attention = MultiHeadAttention(d_model, n_heads, dropout)
        
        # 编码器-解码器注意力
        self.cross_attention = MultiHeadAttention(d_model, n_heads, dropout)
        
        # 前馈网络
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
        
        # 层归一化
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        
        # Dropout
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):
        """
        Args:
            x: [batch, tgt_len, d_model] 解码器输入
            encoder_output: [batch, src_len, d_model] 编码器输出
            src_mask: [batch, 1, src_len] 源序列掩码
            tgt_mask: [batch, tgt_len, tgt_len] 目标序列掩码(因果掩码)
        
        Returns:
            output: [batch, tgt_len, d_model]
            self_attn: 自注意力权重
            cross_attn: 交叉注意力权重
        """
        # 1. 掩码自注意力
        self_attn_output, self_attn = self.self_attention(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(self_attn_output))
        
        # 2. 编码器-解码器注意力
        cross_attn_output, cross_attn = self.cross_attention(
            x, encoder_output, encoder_output, src_mask
        )
        x = self.norm2(x + self.dropout(cross_attn_output))
        
        # 3. 前馈网络
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        
        return x, self_attn, cross_attn


print("\n实验6:Transformer编码器层详细剖析\n")

# 创建编码器层
d_model = 512
n_heads = 8
d_ff = 2048
dropout = 0.1

encoder_layer = EncoderLayer(d_model, n_heads, d_ff, dropout)

# 测试输入
batch_size = 2
seq_len = 10
x = torch.randn(batch_size, seq_len, d_model)

# 前向传播
output, attention = encoder_layer(x)

print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"注意力权重形状: {attention.shape}")
print(f"  - [batch_size, n_heads, seq_len, seq_len]\n")

# 分析参数量
total_params = sum(p.numel() for p in encoder_layer.parameters())
print(f"编码器层总参数量: {total_params:,}")

# 详细分解
print("\n参数分解:")
mha_params = sum(p.numel() for p in encoder_layer.self_attention.parameters())
ffn_params = sum(p.numel() for p in encoder_layer.feed_forward.parameters())
ln_params = sum(p.numel() for p in encoder_layer.norm1.parameters()) + \
            sum(p.numel() for p in encoder_layer.norm2.parameters())

print(f"  - 多头注意力: {mha_params:,} ({mha_params/total_params*100:.1f}%)")
print(f"  - 前馈网络:   {ffn_params:,} ({ffn_params/total_params*100:.1f}%)")
print(f"  - 层归一化:   {ln_params:,} ({ln_params/total_params*100:.1f}%)\n")

# 可视化信息流
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 1. 输入数据
axes[0, 0].imshow(x[0].detach().numpy().T, aspect='auto', cmap='viridis')
axes[0, 0].set_title('输入嵌入 [seq_len, d_model]', fontweight='bold')
axes[0, 0].set_xlabel('序列位置')
axes[0, 0].set_ylabel('嵌入维度')

# 2. 注意力权重(第一个头)
axes[0, 1].imshow(attention[0, 0].detach().numpy(), cmap='YlOrRd')
axes[0, 1].set_title('注意力权重(Head 1)', fontweight='bold')
axes[0, 1].set_xlabel('Key位置')
axes[0, 1].set_ylabel('Query位置')

# 3. 所有头的平均注意力
avg_attention = attention[0].mean(dim=0).detach().numpy()
axes[1, 0].imshow(avg_attention, cmap='Blues')
axes[1, 0].set_title('平均注意力权重(8个头)', fontweight='bold')
axes[1, 0].set_xlabel('Key位置')
axes[1, 0].set_ylabel('Query位置')

# 4. 输出数据
axes[1, 1].imshow(output[0].detach().numpy().T, aspect='auto', cmap='viridis')
axes[1, 1].set_title('输出表示 [seq_len, d_model]', fontweight='bold')
axes[1, 1].set_xlabel('序列位置')
axes[1, 1].set_ylabel('嵌入维度')

plt.tight_layout()
plt.show()

print("关键设计要点:")
print("  ✓ 残差连接:缓解梯度消失,使深层网络训练稳定")
print("  ✓ 层归一化:加速收敛,稳定训练过程")
print("  ✓ 多头注意力:捕获不同子空间的关系")
print("  ✓ FFN:为每个位置独立添加非线性变换\n")

3.2 完整的Transformer模型

class PositionalEncoding(nn.Module):
    """位置编码"""
    
    def __init__(self, d_model, max_len=5000, dropout=0.1):
        super().__init__()
        
        self.dropout = nn.Dropout(dropout)
        
        # 创建位置编码矩阵
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * 
            (-math.log(10000.0) / d_model)
        )
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0)  # [1, max_len, d_model]
        
        # 注册为buffer(不参与训练)
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        """
        Args:
            x: [batch, seq_len, d_model]
        
        Returns:
            output: [batch, seq_len, d_model]
        """
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)


class Transformer(nn.Module):
    """完整的Transformer模型"""
    
    def __init__(
        self,
        src_vocab_size,
        tgt_vocab_size,
        d_model=512,
        n_heads=8,
        n_encoder_layers=6,
        n_decoder_layers=6,
        d_ff=2048,
        dropout=0.1,
        max_len=5000
    ):
        super().__init__()
        
        # 嵌入层
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        
        # 位置编码
        self.pos_encoding = PositionalEncoding(d_model, max_len, dropout)
        
        # 编码器
        self.encoder_layers = nn.ModuleList([
            EncoderLayer(d_model, n_heads, d_ff, dropout)
            for _ in range(n_encoder_layers)
        ])
        
        # 解码器
        self.decoder_layers = nn.ModuleList([
            DecoderLayer(d_model, n_heads, d_ff, dropout)
            for _ in range(n_decoder_layers)
        ])
        
        # 输出层
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
        
        self.dropout = nn.Dropout(dropout)
        
        # 初始化参数
        self._init_parameters()
        
    def _init_parameters(self):
        """参数初始化"""
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)
    
    def make_src_mask(self, src):
        """创建源序列掩码(填充掩码)"""
        # src: [batch, src_len]
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        # [batch, 1, 1, src_len]
        return src_mask
    
    def make_tgt_mask(self, tgt):
        """创建目标序列掩码(因果掩码 + 填充掩码)"""
        # tgt: [batch, tgt_len]
        batch_size, tgt_len = tgt.shape
        
        # 填充掩码
        tgt_pad_mask = (tgt != 0).unsqueeze(1).unsqueeze(2)
        # [batch, 1, 1, tgt_len]
        
        # 因果掩码(下三角矩阵)
        tgt_sub_mask = torch.tril(
            torch.ones((tgt_len, tgt_len), device=tgt.device)
        ).bool()
        # [tgt_len, tgt_len]
        
        # 合并掩码
        tgt_mask = tgt_pad_mask & tgt_sub_mask
        # [batch, 1, tgt_len, tgt_len]
        
        return tgt_mask
    
    def encode(self, src, src_mask):
        """编码器前向传播"""
        # 嵌入 + 位置编码
        x = self.src_embedding(src) * math.sqrt(self.src_embedding.embedding_dim)
        x = self.pos_encoding(x)
        
        # 通过所有编码器层
        encoder_attentions = []
        for layer in self.encoder_layers:
            x, attention = layer(x, src_mask)
            encoder_attentions.append(attention)
        
        return x, encoder_attentions
    
    def decode(self, tgt, encoder_output, src_mask, tgt_mask):
        """解码器前向传播"""
        # 嵌入 + 位置编码
        x = self.tgt_embedding(tgt) * math.sqrt(self.tgt_embedding.embedding_dim)
        x = self.pos_encoding(x)
        
        # 通过所有解码器层
        decoder_self_attentions = []
        decoder_cross_attentions = []
        
        for layer in self.decoder_layers:
            x, self_attn, cross_attn = layer(x, encoder_output, src_mask, tgt_mask)
            decoder_self_attentions.append(self_attn)
            decoder_cross_attentions.append(cross_attn)
        
        return x, decoder_self_attentions, decoder_cross_attentions
    
    def forward(self, src, tgt):
        """
        Args:
            src: [batch, src_len]
            tgt: [batch, tgt_len]
        
        Returns:
            output: [batch, tgt_len, tgt_vocab_size]
            attentions: 所有注意力权重
        """
        # 创建掩码
        src_mask = self.make_src_mask(src)
        tgt_mask = self.make_tgt_mask(tgt)
        
        # 编码
        encoder_output, enc_attns = self.encode(src, src_mask)
        
        # 解码
        decoder_output, dec_self_attns, dec_cross_attns = self.decode(
            tgt, encoder_output, src_mask, tgt_mask
        )
        
        # 输出投影
        output = self.fc_out(decoder_output)
        
        attentions = {
            'encoder': enc_attns,
            'decoder_self': dec_self_attns,
            'decoder_cross': dec_cross_attns
        }
        
        return output, attentions


print("\n实验7:完整Transformer模型构建\n")

# 创建Transformer
src_vocab_size = 10000
tgt_vocab_size = 10000

transformer = Transformer(
    src_vocab_size=src_vocab_size,
    tgt_vocab_size=tgt_vocab_size,
    d_model=512,
    n_heads=8,
    n_encoder_layers=6,
    n_decoder_layers=6,
    d_ff=2048,
    dropout=0.1
)

# 模拟输入
src = torch.randint(1, src_vocab_size, (2, 20))  # [batch=2, src_len=20]
tgt = torch.randint(1, tgt_vocab_size, (2, 15))  # [batch=2, tgt_len=15]

print(f"源序列形状: {src.shape}")
print(f"目标序列形状: {tgt.shape}\n")

# 前向传播
output, attentions = transformer(src, tgt)

print(f"输出形状: {output.shape}")
print(f"  - [batch_size, tgt_len, tgt_vocab_size]\n")

# 统计参数
total_params = sum(p.numel() for p in transformer.parameters())
trainable_params = sum(p.numel() for p in transformer.parameters() if p.requires_grad)

print(f"总参数量: {total_params:,}")
print(f"可训练参数: {trainable_params:,}\n")

# 详细分解
print("参数分布:")
embedding_params = sum(p.numel() for p in transformer.src_embedding.parameters()) + \
                   sum(p.numel() for p in transformer.tgt_embedding.parameters())
encoder_params = sum(p.numel() for layer in transformer.encoder_layers 
                    for p in layer.parameters())
decoder_params = sum(p.numel() for layer in transformer.decoder_layers 
                    for p in layer.parameters())
output_params = sum(p.numel() for p in transformer.fc_out.parameters())

print(f"  嵌入层:   {embedding_params:,} ({embedding_params/total_params*100:.1f}%)")
print(f"  编码器:   {encoder_params:,} ({encoder_params/total_params*100:.1f}%)")
print(f"  解码器:   {decoder_params:,} ({decoder_params/total_params*100:.1f}%)")
print(f"  输出层:   {output_params:,} ({output_params/total_params*100:.1f}%)\n")

# 可视化架构
fig, axes = plt.subplots(1, 3, figsize=(18, 8))

# 编码器注意力(第一层)
enc_attn = attentions['encoder'][0][0, 0].detach().numpy()  # [seq_len, seq_len]
im1 = axes[0].imshow(enc_attn, cmap='Blues', aspect='auto')
axes[0].set_title('编码器自注意力(Layer 1, Head 1)', fontweight='bold')
axes[0].set_xlabel('Key位置')
axes[0].set_ylabel('Query位置')
plt.colorbar(im1, ax=axes[0], label='注意力权重')

# 解码器自注意力(第一层)
dec_self_attn = attentions['decoder_self'][0][0, 0].detach().numpy()
im2 = axes[1].imshow(dec_self_attn, cmap='Greens', aspect='auto')
axes[1].set_title('解码器自注意力(因果掩码)', fontweight='bold')
axes[1].set_xlabel('Key位置')
axes[1].set_ylabel('Query位置')
plt.colorbar(im2, ax=axes[1], label='注意力权重')

# 解码器交叉注意力(第一层)
dec_cross_attn = attentions['decoder_cross'][0][0, 0].detach().numpy()
im3 = axes[2].imshow(dec_cross_attn, cmap='Reds', aspect='auto')
axes[2].set_title('解码器交叉注意力(编码器-解码器)', fontweight='bold')
axes[2].set_xlabel('编码器Key位置')
axes[2].set_ylabel('解码器Query位置')
plt.colorbar(im3, ax=axes[2], label='注意力权重')

plt.tight_layout()
plt.show()

print("架构特点总结:")
print("  ✓ 编码器:双向自注意力,捕获全局上下文")
print("  ✓ 解码器:因果自注意力,防止未来信息泄漏")
print("  ✓ 交叉注意力:对齐源序列和目标序列")
print("  ✓ 6层堆叠:逐步抽象特征表示\n")

📍 第四章:位置编码 - 序列信息的艺术

4.1 为什么需要位置编码?

def demonstrate_position_importance():
    """演示位置信息的重要性"""
    
    print("\n实验8:位置编码的必要性\n")
    
    # 两个语义完全不同的句子(词序不同)
    sentence1 = "The dog bit the man"
    sentence2 = "The man bit the dog"
    
    print("示例句子:")
    print(f"  句子1: {sentence1}")
    print(f"  句子2: {sentence2}")
    print(f"  词汇相同,但语义完全相反!\n")
    
    # 模拟词袋模型(无位置信息)
    from collections import Counter
    
    words1 = sentence1.lower().split()
    words2 = sentence2.lower().split()
    
    bow1 = Counter(words1)
    bow2 = Counter(words2)
    
    print("词袋表示(Bag of Words):")
    print(f"  句子1: {dict(bow1)}")
    print(f"  句子2: {dict(bow2)}")
    print(f"  结果:完全相同!❌\n")
    
    print("⚠️ 问题:自注意力机制本身是置换不变的")
    print("   即:Attention(x_1, x_2, x_3) = Attention(x_2, x_1, x_3)")
    print("   解决:必须注入位置信息!\n")

demonstrate_position_importance()

4.2 正弦位置编码详解

class PositionalEncodingAnalyzer:
    """位置编码分析器"""
    
    def __init__(self, d_model=512, max_len=100):
        self.d_model = d_model
        self.max_len = max_len
        
        # 生成位置编码矩阵
        self.pe = self.create_positional_encoding()
    
    def create_positional_encoding(self):
        """创建正弦位置编码"""
        pe = torch.zeros(self.max_len, self.d_model)
        position = torch.arange(0, self.max_len, dtype=torch.float).unsqueeze(1)
        
        div_term = torch.exp(
            torch.arange(0, self.d_model, 2).float() * 
            (-math.log(10000.0) / self.d_model)
        )
        
        # PE(pos, 2i) = sin(pos / 10000^(2i/d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        
        # PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
        pe[:, 1::2] = torch.cos(position * div_term)
        
        return pe
    
    def visualize_encoding(self):
        """可视化位置编码"""
        fig, axes = plt.subplots(2, 2, figsize=(16, 10))
        
        # 1. 完整位置编码矩阵
        im1 = axes[0, 0].imshow(
            self.pe.numpy().T,
            cmap='RdBu',
            aspect='auto',
            extent=[0, self.max_len, 0, self.d_model]
        )
        axes[0, 0].set_xlabel('位置', fontsize=12)
        axes[0, 0].set_ylabel('嵌入维度', fontsize=12)
        axes[0, 0].set_title('位置编码矩阵 [max_len, d_model]', 
                            fontsize=14, fontweight='bold')
        plt.colorbar(im1, ax=axes[0, 0])
        
        # 2. 不同位置的编码向量
        positions = [0, 10, 50, 99]
        for pos in positions:
            axes[0, 1].plot(
                self.pe[pos, :64].numpy(),  # 只显示前64维
                label=f'位置 {pos}',
                linewidth=2,
                alpha=0.7
            )
        
        axes[0, 1].set_xlabel('维度索引', fontsize=12)
        axes[0, 1].set_ylabel('编码值', fontsize=12)
        axes[0, 1].set_title('不同位置的编码向量(前64维)', 
                            fontsize=14, fontweight='bold')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 不同维度的频率
        dims = [0, 10, 100, 500]
        x = np.arange(self.max_len)
        
        for dim in dims:
            axes[1, 0].plot(
                x,
                self.pe[:, dim].numpy(),
                label=f'维度 {dim}',
                linewidth=2,
                alpha=0.7
            )
        
        axes[1, 0].set_xlabel('位置', fontsize=12)
        axes[1, 0].set_ylabel('编码值', fontsize=12)
        axes[1, 0].set_title('不同维度的周期性模式', 
                            fontsize=14, fontweight='bold')
        axes[1, 0].legend()
        axes[1, 0].grid(True, alpha=0.3)
        
        # 4. 相对位置的内积(相似度)
        # 计算位置0与其他位置的余弦相似度
        pos_0 = self.pe[0].unsqueeze(0)
        similarities = F.cosine_similarity(
            pos_0.repeat(self.max_len, 1),
            self.pe,
            dim=1
        ).numpy()
        
        axes[1, 1].plot(x, similarities, linewidth=2.5, color='purple')
        axes[1, 1].set_xlabel('相对位置', fontsize=12)
        axes[1, 1].set_ylabel('余弦相似度', fontsize=12)
        axes[1, 1].set_title('位置0与其他位置的相似度', 
                            fontsize=14, fontweight='bold')
        axes[1, 1].grid(True, alpha=0.3)
        axes[1, 1].axhline(y=0, color='red', linestyle='--', alpha=0.5)
        
        plt.tight_layout()
        plt.show()
    
    def analyze_properties(self):
        """分析位置编码性质"""
        print("\n实验9:正弦位置编码性质分析\n")
        
        # 1. 唯一性
        print("1. 唯一性验证:")
        unique_encodings = torch.unique(self.pe, dim=0).shape[0]
        print(f"   不同的位置编码数: {unique_encodings}")
        print(f"   总位置数: {self.max_len}")
        print(f"   是否唯一: {'✅' if unique_encodings == self.max_len else '❌'}\n")
        
        # 2. 相对位置线性性
        print("2. 相对位置线性性:")
        print("   理论:PE(pos+k) 可以表示为 PE(pos) 的线性函数")
        
        k = 5
        pos = 10
        
        # 计算PE(pos+k) 和 PE(pos)的相似度
        similarity = F.cosine_similarity(
            self.pe[pos].unsqueeze(0),
            self.pe[pos+k].unsqueeze(0)
        ).item()
        
        print(f"   PE({pos}) 与 PE({pos+k}) 的余弦相似度: {similarity:.4f}")
        print(f"   含义:位置 {pos} 和 {pos+k} 有一定关系,但不完全相同\n")
        
        # 3. 外推能力
        print("3. 外推能力测试:")
        print("   正弦编码可以推广到未见过的序列长度")
        
        extended_pe = self.create_extended_encoding(self.max_len + 50)
        print(f"   原始最大长度: {self.max_len}")
        print(f"   扩展后长度: {extended_pe.shape[0]}")
        print(f"   编码维度保持: {extended_pe.shape[1]} = {self.d_model} ✅\n")
        
        # 4. 频率分布
        print("4. 频率分布:")
        print("   低维度:高频变化(局部位置信息)")
        print("   高维度:低频变化(全局位置信息)")
        
        # 计算不同维度的周期
        for dim in [0, 128, 256, 384, 510]:
            # 找到第一个周期(近似)
            signal = self.pe[:, dim].numpy()
            peaks = np.where(np.diff(np.sign(np.diff(signal))) < 0)[0] + 1
            
            if len(peaks) > 1:
                period = peaks[1] - peaks[0]
                print(f"   维度 {dim:3d}: 周期 ≈ {period:3d} 个位置")
        
        print()
    
    def create_extended_encoding(self, new_max_len):
        """创建扩展的位置编码"""
        pe = torch.zeros(new_max_len, self.d_model)
        position = torch.arange(0, new_max_len, dtype=torch.float).unsqueeze(1)
        
        div_term = torch.exp(
            torch.arange(0, self.d_model, 2).float() * 
            (-math.log(10000.0) / self.d_model)
        )
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        return pe

# 运行分析
pe_analyzer = PositionalEncodingAnalyzer(d_model=512, max_len=100)
pe_analyzer.visualize_encoding()
pe_analyzer.analyze_properties()

print("关键洞察:")
print("  ✓ 每个位置有唯一的编码向量")
print("  ✓ 相对位置关系通过向量运算可学习")
print("  ✓ 可以外推到训练时未见过的长度")
print("  ✓ 多尺度频率捕获局部和全局信息\n")

4.3 可学习位置编码 vs 固定位置编码

class LearnablePositionalEncoding(nn.Module):
    """可学习的位置编码"""
    
    def __init__(self, d_model, max_len=5000, dropout=0.1):
        super().__init__()
        
        self.dropout = nn.Dropout(dropout)
        
        # 可学习的位置嵌入
        self.position_embeddings = nn.Embedding(max_len, d_model)
        
    def forward(self, x):
        """
        Args:
            x: [batch, seq_len, d_model]
        """
        batch_size, seq_len, _ = x.shape
        
        positions = torch.arange(0, seq_len, device=x.device).unsqueeze(0)
        positions = positions.expand(batch_size, seq_len)
        
        pos_embeddings = self.position_embeddings(positions)
        
        x = x + pos_embeddings
        return self.dropout(x)


def compare_positional_encodings():
    """对比不同位置编码方案"""
    
    print("\n实验10:固定位置编码 vs 可学习位置编码\n")
    
    d_model = 128
    max_len = 50
    
    # 固定位置编码
    fixed_pe = PositionalEncoding(d_model, max_len)
    
    # 可学习位置编码
    learnable_pe = LearnablePositionalEncoding(d_model, max_len)
    
    # 创建测试输入
    x = torch.randn(4, max_len, d_model)
    
    # 应用编码
    x_fixed = fixed_pe(x.clone())
    x_learnable = learnable_pe(x.clone())
    
    print(f"输入形状: {x.shape}")
    print(f"固定编码输出: {x_fixed.shape}")
    print(f"可学习编码输出: {x_learnable.shape}\n")
    
    # 对比
    comparison = {
        '固定正弦编码': {
            '参数量': 0,
            '外推能力': '✅ 可外推到更长序列',
            '初始化': '数学公式定义',
            '训练': '不参与训练',
            '代表模型': 'Transformer (Vaswani 2017)'
        },
        '可学习编码': {
            '参数量': d_model * max_len,
            '外推能力': '❌ 受max_len限制',
            '初始化': '随机初始化',
            '训练': '反向传播学习',
            '代表模型': 'BERT, GPT'
        }
    }
    
    print("="*80)
    print(f"{'方法':<15} {'参数量':<15} {'外推能力':<25} {'代表模型':<20}")
    print("="*80)
    
    for method, props in comparison.items():
        print(f"{method:<15} {str(props['参数量']):<15} "
              f"{props['外推能力']:<25} {props['代表模型']:<20}")
    
    print("="*80)
    print()
    
    # 可视化对比
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
    
    # 固定编码的模式
    fixed_pattern = fixed_pe.pe[0, :max_len, :64].detach().numpy().T
    im1 = ax1.imshow(fixed_pattern, aspect='auto', cmap='coolwarm')
    ax1.set_title('固定正弦编码(前64维)', fontsize=13, fontweight='bold')
    ax1.set_xlabel('位置')
    ax1.set_ylabel('维度')
    plt.colorbar(im1, ax=ax1)
    
    # 可学习编码的初始模式
    with torch.no_grad():
        positions = torch.arange(0, max_len).unsqueeze(0)
        learnable_pattern = learnable_pe.position_embeddings(positions)[0, :, :64].numpy().T
    
    im2 = ax2.imshow(learnable_pattern, aspect='auto', cmap='coolwarm')
    ax2.set_title('可学习编码(初始化,前64维)', fontsize=13, fontweight='bold')
    ax2.set_xlabel('位置')
    ax2.set_ylabel('维度')
    plt.colorbar(im2, ax=ax2)
    
    plt.tight_layout()
    plt.show()
    
    print("选择建议:")
    print("  • 固定编码:适合需要外推的任务(如长文本生成)")
    print("  • 可学习编码:适合固定长度任务(如分类、NER)")
    print("  • 实践中:两者性能差异不大,BERT等大模型多用可学习编码\n")

compare_positional_encodings()

🎯 第五章:多头注意力 - 并行的智慧

5.1 为什么需要多头?

def visualize_multi_head_concept():
    """可视化多头注意力的核心概念"""
    
    print("\n实验11:多头注意力的直觉理解\n")
    
    # 类比:多个专家从不同角度看问题
    sentence = "The bank can guarantee deposits will eventually cover future tuition costs"
    
    print("示例句子:")
    print(f"  '{sentence}'\n")
    
    print("单头注意力的局限:")
    print("  只能学习一种关系模式")
    print("  例如:可能只关注句法关系(主谓宾)\n")
    
    print("多头注意力的优势:")
    heads_focus = {
        'Head 1 (句法)': ['bank → can', 'deposits → cover', 'costs → tuition'],
        'Head 2 (语义)': ['bank → deposits', 'guarantee → cover', 'future → eventually'],
        'Head 3 (共指)': ['deposits → costs', 'will → eventually'],
        'Head 4 (位置)': ['相邻词关系', '远距离依赖']
    }
    
    for head, relations in heads_focus.items():
        print(f"  {head}:")
        for rel in relations:
            print(f"    - {rel}")
    
    print("\n核心思想:不同头学习不同的注意力模式!\n")


class MultiHeadAttentionVisualizer:
    """多头注意力可视化工具"""
    
    def __init__(self, d_model=512, n_heads=8):
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        
        # 创建多头注意力层
        self.mha = MultiHeadAttention(d_model, n_heads)
    
    def analyze_head_specialization(self):
        """分析不同头的专业化"""
        
        print("\n实验12:注意力头的专业化分析\n")
        
        # 创建模拟句子
        batch_size = 1
        seq_len = 12
        x = torch.randn(batch_size, seq_len, self.d_model)
        
        # 前向传播
        output, attention = self.mha(x, x, x)
        
        # 分析每个头的注意力分布
        attention_np = attention[0].detach().numpy()  # [n_heads, seq_len, seq_len]
        
        fig, axes = plt.subplots(2, 4, figsize=(20, 10))
        axes = axes.flatten()
        
        words = ['The', 'cat', 'sat', 'on', 'the', 'mat', 'and', 
                 'watched', 'the', 'dog', 'play', '.']
        
        for head_idx in range(self.n_heads):
            ax = axes[head_idx]
            
            # 绘制注意力热图
            im = ax.imshow(attention_np[head_idx], cmap='YlOrRd', vmin=0, vmax=1)
            ax.set_xticks(range(seq_len))
            ax.set_yticks(range(seq_len))
            ax.set_xticklabels(words, rotation=45, ha='right', fontsize=9)
            ax.set_yticklabels(words, fontsize=9)
            ax.set_title(f'Head {head_idx + 1}', fontweight='bold', fontsize=12)
            
            # 添加颜色条
            plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
        
        plt.tight_layout()
        plt.show()
        
        # 计算每个头的统计特性
        print("各头注意力模式分析:\n")
        
        for head_idx in range(self.n_heads):
            head_attn = attention_np[head_idx]
            
            # 对角线权重(关注自身)
            diagonal_weight = np.mean(np.diag(head_attn))
            
            # 局部性(关注相邻词)
            local_weight = 0
            for i in range(seq_len):
                if i > 0:
                    local_weight += head_attn[i, i-1]
                if i < seq_len - 1:
                    local_weight += head_attn[i, i+1]
            local_weight /= (2 * seq_len - 2)
            
            # 全局性(关注远距离)
            global_weight = 0
            count = 0
            for i in range(seq_len):
                for j in range(seq_len):
                    if abs(i - j) > 3:
                        global_weight += head_attn[i, j]
                        count += 1
            global_weight /= count if count > 0 else 1
            
            print(f"Head {head_idx + 1}:")
            print(f"  自注意权重: {diagonal_weight:.3f}")
            print(f"  局部权重:   {local_weight:.3f}")
            print(f"  全局权重:   {global_weight:.3f}")
            
            # 判断模式
            if diagonal_weight > 0.3:
                pattern = "自关注型"
            elif local_weight > global_weight:
                pattern = "局部型"
            else:
                pattern = "全局型"
            
            print(f"  模式类型:   {pattern}\n")
    
    def visualize_head_projection(self):
        """可视化头的投影空间分割"""
        
        print("\n实验13:多头如何分割特征空间\n")
        
        print(f"模型维度 d_model: {self.d_model}")
        print(f"注意力头数 n_heads: {self.n_heads}")
        print(f"每头维度 d_k: {self.d_k}\n")
        
        # 可视化维度分割
        fig, ax = plt.subplots(figsize=(14, 4))
        
        colors = plt.cm.Set3(range(self.n_heads))
        
        for head_idx in range(self.n_heads):
            start = head_idx * self.d_k
            end = (head_idx + 1) * self.d_k
            
            ax.barh(0, self.d_k, left=start, height=0.8, 
                   color=colors[head_idx], edgecolor='black', linewidth=2,
                   label=f'Head {head_idx + 1}')
            
            # 标注维度范围
            ax.text(start + self.d_k/2, 0, f'{start}-{end}', 
                   ha='center', va='center', fontsize=10, fontweight='bold')
        
        ax.set_xlim(0, self.d_model)
        ax.set_ylim(-0.5, 0.5)
        ax.set_xlabel('特征维度', fontsize=12)
        ax.set_title(f'多头注意力特征空间分割({self.n_heads}个头)', 
                    fontsize=14, fontweight='bold')
        ax.set_yticks([])
        ax.legend(loc='upper left', bbox_to_anchor=(1, 1), ncol=1)
        
        plt.tight_layout()
        plt.show()
        
        print("关键机制:")
        print(f"  1. 输入 x ∈ R^{self.d_model} 被投影到 {self.n_heads} 个子空间")
        print(f"  2. 每个子空间维度为 d_k = {self.d_k}")
        print(f"  3. 各头独立计算注意力,学习不同模式")
        print(f"  4. 最后拼接并线性变换回 R^{self.d_model}\n")
        
        # 展示计算流程
        print("数学公式:")
        print("  MultiHead(Q, K, V) = Concat(head₁, ..., head₈)·W^O")
        print("  其中:")
        print("    head_i = Attention(Q·W^Q_i, K·W^K_i, V·W^V_i)")
        print(f"    W^Q_i, W^K_i, W^V_i ∈ R^({self.d_model}×{self.d_k})")
        print(f"    W^O ∈ R^({self.d_model}×{self.d_model})\n")


visualize_multi_head_concept()

mha_viz = MultiHeadAttentionVisualizer(d_model=512, n_heads=8)
mha_viz.analyze_head_specialization()
mha_viz.visualize_head_projection()

5.2 多头注意力的计算效率

class MultiHeadEfficiencyAnalyzer:
    """多头注意力效率分析"""
    
    def __init__(self):
        pass
    
    def compare_implementations(self):
        """对比不同实现方式"""
        
        print("\n实验14:多头注意力的高效实现\n")
        
        d_model = 512
        n_heads = 8
        batch_size = 32
        seq_len = 128
        
        # 方案1:循环实现(低效)
        print("方案1:循环计算每个头(朴素实现)")
        print("  for head in range(n_heads):")
        print("      head_output = attention(Q_head, K_head, V_head)")
        print("  缺点:无法并行,GPU利用率低\n")
        
        # 方案2:批量并行实现(高效)
        print("方案2:批量并行计算(实际使用)")
        print("  Q, K, V reshape: [batch, seq_len, n_heads, d_k]")
        print("  transpose: [batch, n_heads, seq_len, d_k]")
        print("  批量矩阵乘法:所有头同时计算")
        print("  优点:充分利用GPU并行能力\n")
        
        # 性能对比
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        
        # 创建测试数据
        Q = torch.randn(batch_size, seq_len, d_model, device=device)
        K = torch.randn(batch_size, seq_len, d_model, device=device)
        V = torch.randn(batch_size, seq_len, d_model, device=device)
        
        # 创建多头注意力层
        mha = MultiHeadAttention(d_model, n_heads).to(device)
        
        # 预热
        with torch.no_grad():
            for _ in range(10):
                _ = mha(Q, K, V)
        
        # 计时
        import time
        
        n_runs = 100
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        
        start = time.time()
        with torch.no_grad():
            for _ in range(n_runs):
                output, _ = mha(Q, K, V)
        
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        end = time.time()
        
        avg_time = (end - start) / n_runs * 1000  # 转换为毫秒
        
        print(f"性能测试结果({device.upper()}):")
        print(f"  批次大小: {batch_size}")
        print(f"  序列长度: {seq_len}")
        print(f"  平均时间: {avg_time:.2f} ms")
        print(f"  吞吐量: {batch_size * seq_len / avg_time * 1000:.0f} tokens/s\n")
        
        # FLOPs分析
        print("计算复杂度分析:")
        
        # Q·K^T
        qk_flops = batch_size * n_heads * seq_len * seq_len * (d_model // n_heads)
        
        # softmax(QK^T)·V
        sv_flops = batch_size * n_heads * seq_len * seq_len * (d_model // n_heads)
        
        # 线性投影(Q, K, V, O)
        proj_flops = 4 * batch_size * seq_len * d_model * d_model
        
        total_flops = qk_flops + sv_flops + proj_flops
        
        print(f"  QK^T计算:    {qk_flops / 1e9:.2f} GFLOPs")
        print(f"  Softmax·V:   {sv_flops / 1e9:.2f} GFLOPs")
        print(f"  线性投影:    {proj_flops / 1e9:.2f} GFLOPs")
        print(f"  总计:        {total_flops / 1e9:.2f} GFLOPs\n")
        
        # 内存占用
        print("内存占用分析:")
        
        # 输入
        input_mem = 3 * batch_size * seq_len * d_model * 4 / 1024**2  # float32
        
        # 注意力矩阵
        attn_mem = batch_size * n_heads * seq_len * seq_len * 4 / 1024**2
        
        # 权重矩阵
        weight_mem = 4 * d_model * d_model * 4 / 1024**2
        
        total_mem = input_mem + attn_mem + weight_mem
        
        print(f"  输入张量:    {input_mem:.2f} MB")
        print(f"  注意力矩阵:  {attn_mem:.2f} MB")
        print(f"  权重参数:    {weight_mem:.2f} MB")
        print(f"  总计:        {total_mem:.2f} MB\n")
    
    def visualize_bottleneck(self):
        """可视化计算瓶颈"""
        
        seq_lengths = [64, 128, 256, 512, 1024, 2048]
        d_model = 512
        n_heads = 8
        batch_size = 16
        
        attn_flops = []
        proj_flops = []
        attn_mem = []
        
        for seq_len in seq_lengths:
            # 注意力计算
            attn_f = 2 * batch_size * n_heads * seq_len**2 * (d_model // n_heads)
            attn_flops.append(attn_f / 1e9)
            
            # 投影计算
            proj_f = 4 * batch_size * seq_len * d_model**2
            proj_flops.append(proj_f / 1e9)
            
            # 注意力矩阵内存
            attn_m = batch_size * n_heads * seq_len**2 * 4 / 1024**2
            attn_mem.append(attn_m)
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
        
        # FLOPs对比
        x = np.arange(len(seq_lengths))
        width = 0.35
        
        ax1.bar(x - width/2, attn_flops, width, label='注意力计算 (O(n²d))', 
               color='coral', alpha=0.8)
        ax1.bar(x + width/2, proj_flops, width, label='线性投影 (O(nd²))', 
               color='skyblue', alpha=0.8)
        
        ax1.set_xlabel('序列长度', fontsize=12)
        ax1.set_ylabel('计算量 (GFLOPs)', fontsize=12)
        ax1.set_title('计算量随序列长度变化', fontsize=14, fontweight='bold')
        ax1.set_xticks(x)
        ax1.set_xticklabels(seq_lengths)
        ax1.legend(fontsize=11)
        ax1.grid(True, alpha=0.3, axis='y')
        
        # 内存占用
        ax2.plot(seq_lengths, attn_mem, marker='o', linewidth=2.5, 
                markersize=8, color='red', label='注意力矩阵')
        ax2.fill_between(seq_lengths, attn_mem, alpha=0.3, color='red')
        
        ax2.set_xlabel('序列长度', fontsize=12)
        ax2.set_ylabel('内存占用 (MB)', fontsize=12)
        ax2.set_title('注意力矩阵内存随序列长度变化', fontsize=14, fontweight='bold')
        ax2.legend(fontsize=11)
        ax2.grid(True, alpha=0.3)
        ax2.set_yscale('log')
        
        # 标注关键点
        for i, (sl, mem) in enumerate(zip(seq_lengths, attn_mem)):
            if sl in [512, 2048]:
                ax2.annotate(f'{mem:.0f} MB', 
                           xy=(sl, mem), 
                           xytext=(10, 10),
                           textcoords='offset points',
                           fontsize=10,
                           bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.5))
        
        plt.tight_layout()
        plt.show()
        
        print("\n瓶颈分析:")
        print("  ✓ 序列长度 < 512:线性投影是主要计算量")
        print("  ✓ 序列长度 > 512:注意力计算(O(n²))成为瓶颈")
        print("  ✓ 长序列场景:注意力矩阵内存占用急剧增长")
        print("  → 解决方案:稀疏注意力、线性注意力(第9章)\n")


analyzer = MultiHeadEfficiencyAnalyzer()
analyzer.compare_implementations()
analyzer.visualize_bottleneck()

🔗 第六章:前馈网络与残差连接 - 稳定性的基石

6.1 Position-wise Feed-Forward Network

class FFNAnalyzer:
    """前馈网络分析器"""
    
    def __init__(self):
        pass
    
    def explain_ffn_role(self):
        """解释FFN的作用"""
        
        print("\n实验15:前馈网络的作用\n")
        
        print("FFN = Feed-Forward Network(位置前馈网络)\n")
        
        print("定义:")
        print("  FFN(x) = max(0, x·W₁ + b₁)·W₂ + b₂")
        print("  或者:")
        print("  FFN(x) = ReLU(x·W₁ + b₁)·W₂ + b₂\n")
        
        print("关键特点:")
        print("  1. Position-wise:对每个位置独立应用")
        print("     即:FFN(x[i]) 与其他位置无关")
        print("  2. 两层全连接网络")
        print("     第一层:d_model → d_ff (扩张,通常d_ff = 4×d_model)")
        print("     第二层:d_ff → d_model (收缩)")
        print("  3. ReLU激活函数引入非线性\n")
        
        print("为什么需要FFN?")
        print("  问题:注意力机制本质是加权平均(线性操作)")
        print("  解决:FFN提供非线性变换能力")
        print("  类比:CNN中的1×1卷积 + 非线性激活\n")
    
    def visualize_ffn_transformation(self):
        """可视化FFN的变换"""
        
        print("\n实验16:FFN的特征变换可视化\n")
        
        d_model = 512
        d_ff = 2048
        batch_size = 1
        seq_len = 10
        
        # 创建FFN
        ffn = PositionwiseFeedForward(d_model, d_ff)
        
        # 输入数据
        x = torch.randn(batch_size, seq_len, d_model)
        
        # 前向传播并记录中间值
        with torch.no_grad():
            # 第一层
            hidden = ffn.linear1(x)  # [batch, seq_len, d_ff]
            hidden_activated = F.relu(hidden)
            
            # 第二层
            output = ffn.linear2(hidden_activated)  # [batch, seq_len, d_model]
        
        # 可视化
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        
        # 输入
        im1 = axes[0, 0].imshow(x[0].numpy().T, aspect='auto', cmap='viridis')
        axes[0, 0].set_title(f'输入 [{seq_len}, {d_model}]', fontweight='bold')
        axes[0, 0].set_xlabel('序列位置')
        axes[0, 0].set_ylabel('特征维度')
        plt.colorbar(im1, ax=axes[0, 0])
        
        # 第一层输出(ReLU前)
        im2 = axes[0, 1].imshow(hidden[0].numpy().T, aspect='auto', cmap='viridis')
        axes[0, 1].set_title(f'第一层 [{seq_len}, {d_ff}] (ReLU前)', fontweight='bold')
        axes[0, 1].set_xlabel('序列位置')
        axes[0, 1].set_ylabel('特征维度')
        plt.colorbar(im2, ax=axes[0, 1])
        
        # 第一层输出(ReLU后)
        im3 = axes[1, 0].imshow(hidden_activated[0].numpy().T, aspect='auto', cmap='viridis')
        axes[1, 0].set_title(f'第一层 [{seq_len}, {d_ff}] (ReLU后)', fontweight='bold')
        axes[1, 0].set_xlabel('序列位置')
        axes[1, 0].set_ylabel('特征维度')
        plt.colorbar(im3, ax=axes[1, 0])
        
        # 输出
        im4 = axes[1, 1].imshow(output[0].numpy().T, aspect='auto', cmap='viridis')
        axes[1, 1].set_title(f'输出 [{seq_len}, {d_model}]', fontweight='bold')
        axes[1, 1].set_xlabel('序列位置')
        axes[1, 1].set_ylabel('特征维度')
        plt.colorbar(im4, ax=axes[1, 1])
        
        plt.tight_layout()
        plt.show()
        
        # 统计分析
        print("ReLU的稀疏化效果:")
        sparsity_before = (hidden[0] == 0).float().mean().item() * 100
        sparsity_after = (hidden_activated[0] == 0).float().mean().item() * 100
        
        print(f"  ReLU前零元素比例: {sparsity_before:.2f}%")
        print(f"  ReLU后零元素比例: {sparsity_after:.2f}%")
        print(f"  稀疏度提升: {sparsity_after - sparsity_before:.2f}%\n")
        
        print("稀疏激活的好处:")
        print("  ✓ 提高模型容量(神经元选择性激活)")
        print("  ✓ 减少过拟合风险")
        print("  ✓ 加速推理(可利用稀疏计算优化)\n")


ffn_analyzer = FFNAnalyzer()
ffn_analyzer.explain_ffn_role()
ffn_analyzer.visualize_ffn_transformation()

6.2 残差连接与层归一化

class ResidualConnectionAnalyzer:
    """残差连接分析器"""
    
    def __init__(self):
        pass
    
    def explain_residual_connection(self):
        """解释残差连接"""
        
        print("\n实验17:残差连接的重要性\n")
        
        print("定义:")
        print("  output = LayerNorm(x + Sublayer(x))")
        print("  其中 Sublayer 可以是:")
        print("    - 多头注意力")
        print("    - 前馈网络\n")
        
        print("为什么需要残差连接?")
        print("  1. 缓解梯度消失问题")
        print("     反向传播时,梯度可以直接通过shortcut传递")
        print("  2. 允许更深的网络")
        print("     原始Transformer:6层编码器 + 6层解码器")
        print("     GPT-3:96层!")
        print("  3. 加速训练收敛")
        print("     初始时,Sublayer(x) ≈ 0,网络从恒等映射开始学习\n")
    
    def demonstrate_gradient_flow(self):
        """演示梯度流"""
        
        print("\n实验18:残差连接的梯度流分析\n")
        
        # 创建简单的网络:有残差 vs 无残差
        class SimpleBlockWithResidual(nn.Module):
            def __init__(self, dim):
                super().__init__()
                self.layer = nn.Linear(dim, dim)
                self.norm = nn.LayerNorm(dim)
            
            def forward(self, x):
                return self.norm(x + self.layer(x))
        
        class SimpleBlockWithoutResidual(nn.Module):
            def __init__(self, dim):
                super().__init__()
                self.layer = nn.Linear(dim, dim)
                self.norm = nn.LayerNorm(dim)
            
            def forward(self, x):
                return self.norm(self.layer(x))
        
        # 堆叠多层
        n_layers = 10
        dim = 128
        
        # 有残差
        model_with_res = nn.Sequential(*[
            SimpleBlockWithResidual(dim) for _ in range(n_layers)
        ])
        
        # 无残差
        model_without_res = nn.Sequential(*[
            SimpleBlockWithoutResidual(dim) for _ in range(n_layers)
        ])
        
        # 输入
        x = torch.randn(1, 10, dim, requires_grad=True)
        
        # 前向传播
        out_with_res = model_with_res(x)
        out_without_res = model_without_res(x.clone())
        
        # 反向传播
        loss_with_res = out_with_res.sum()
        loss_without_res = out_without_res.sum()
        
        loss_with_res.backward()
        grad_with_res = x.grad.clone()
        
        x.grad.zero_()
        loss_without_res.backward()
        grad_without_res = x.grad.clone()
        
        # 比较梯度范数
        grad_norm_with = grad_with_res.norm().item()
        grad_norm_without = grad_without_res.norm().item()
        
        print(f"梯度范数对比({n_layers}层网络):")
        print(f"  有残差连接:   {grad_norm_with:.4f}")
        print(f"  无残差连接:   {grad_norm_without:.4f}")
        print(f"  差异:         {grad_norm_with / grad_norm_without:.2f}×\n")
        
        # 逐层梯度范数
        print("逐层梯度分析:\n")
        
        grad_norms_with = []
        grad_norms_without = []
        
        # 有残差
        for i, layer in enumerate(model_with_res):
            for param in layer.parameters():
                if param.grad is not None:
                    grad_norms_with.append(param.grad.norm().item())
                    break
        
        # 无残差
        for i, layer in enumerate(model_without_res):
            for param in layer.parameters():
                if param.grad is not None:
                    grad_norms_without.append(param.grad.norm().item())
                    break
        
        # 可视化
        fig, ax = plt.subplots(figsize=(12, 6))
        
        layers = range(1, n_layers + 1)
        ax.plot(layers, grad_norms_with, marker='o', linewidth=2.5, 
               label='有残差连接', color='green')
        ax.plot(layers, grad_norms_without, marker='s', linewidth=2.5,
               label='无残差连接', color='red')
        
        ax.set_xlabel('层编号', fontsize=12)
        ax.set_ylabel('梯度范数', fontsize=12)
        ax.set_title('梯度在网络中的传播', fontsize=14, fontweight='bold')
        ax.legend(fontsize=11)
        ax.grid(True, alpha=0.3)
        ax.set_yscale('log')
        
        plt.tight_layout()
        plt.show()
        
        print("观察:")
        print("  ✓ 有残差:梯度相对稳定")
        print("  ✓ 无残差:梯度逐层衰减(梯度消失)\n")
    
    def explain_layer_normalization(self):
        """解释层归一化"""
        
        print("\n实验19:层归一化 vs 批归一化\n")
        
        print("Layer Normalization (LN):")
        print("  对每个样本的特征维度归一化")
        print("  μ = mean(x[i, :])  # 对特征维度求均值")
        print("  σ = std(x[i, :])   # 对特征维度求标准差")
        print("  LN(x[i]) = γ × (x[i] - μ) / σ + β\n")
        
        print("Batch Normalization (BN):")
        print("  对每个特征维度的批次归一化")
        print("  μ = mean(x[:, j])  # 对批次维度求均值")
        print("  σ = std(x[:, j])   # 对批次维度求标准差\n")
        
        # 可视化对比
        batch_size = 4
        seq_len = 5
        d_model = 6
        
        x = torch.randn(batch_size, seq_len, d_model)
        
        # Layer Norm
        ln = nn.LayerNorm(d_model)
        x_ln = ln(x)
        
        # Batch Norm (需要调整维度)
        bn = nn.BatchNorm1d(d_model)
        x_bn = bn(x.transpose(1, 2)).transpose(1, 2)
        
        fig, axes = plt.subplots(1, 3, figsize=(16, 4))
        
        # 原始数据
        im1 = axes[0].imshow(x[0].numpy(), cmap='coolwarm', aspect='auto')
        axes[0].set_title('原始数据(第1个样本)', fontweight='bold')
        axes[0].set_xlabel('特征维度')
        axes[0].set_ylabel('序列位置')
        plt.colorbar(im1, ax=axes[0])
        
        # Layer Norm
        im2 = axes[1].imshow(x_ln[0].detach().numpy(), cmap='coolwarm', aspect='auto')
        axes[1].set_title('Layer Norm后', fontweight='bold')
        axes[1].set_xlabel('特征维度')
        axes[1].set_ylabel('序列位置')
        plt.colorbar(im2, ax=axes[1])
        
        # Batch Norm
        im3 = axes[2].imshow(x_bn[0].detach().numpy(), cmap='coolwarm', aspect='auto')
        axes[2].set_title('Batch Norm后', fontweight='bold')
        axes[2].set_xlabel('特征维度')
        axes[2].set_ylabel('序列位置')
        plt.colorbar(im3, ax=axes[2])
        
        plt.tight_layout()
        plt.show()
        
        print("为什么Transformer使用Layer Norm?")
        print("  1. 序列长度可变:BN依赖批次统计,处理变长序列困难")
        print("  2. 小批次友好:LN不依赖批次大小")
        print("  3. 推理一致性:LN训练和推理行为相同")
        print("  4. RNN友好:可以逐步处理序列\n")


res_analyzer = ResidualConnectionAnalyzer()
res_analyzer.explain_residual_connection()
res_analyzer.demonstrate_gradient_flow()
res_analyzer.explain_layer_normalization()

🤖 第七章:从GPT到BERT - 预训练范式的分野

7.1 自回归语言模型:GPT系列

class GPTStyleDecoder(nn.Module):
    """GPT风格的仅解码器架构"""
    
    def __init__(
        self,
        vocab_size,
        d_model=768,
        n_heads=12,
        n_layers=12,
        d_ff=3072,
        max_len=1024,
        dropout=0.1
    ):
        super().__init__()
        
        # Token嵌入
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        
        # 位置嵌入(可学习)
        self.position_embedding = nn.Embedding(max_len, d_model)
        
        # Transformer解码器层(仅使用掩码自注意力)
        self.layers = nn.ModuleList([
            GPTBlock(d_model, n_heads, d_ff, dropout)
            for _ in range(n_layers)
        ])
        
        # 输出层
        self.ln_f = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, vocab_size, bias=False)
        
        # 权重绑定(Token嵌入和输出层共享权重)
        self.head.weight = self.token_embedding.weight
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input_ids, past_key_values=None):
        """
        Args:
            input_ids: [batch, seq_len]
            past_key_values: 缓存的K, V(用于生成)
        
        Returns:
            logits: [batch, seq_len, vocab_size]
        """
        batch_size, seq_len = input_ids.shape
        
        # 位置索引
        if past_key_values is None:
            past_length = 0
            position_ids = torch.arange(0, seq_len, device=input_ids.device)
        else:
            past_length = past_key_values[0][0].size(-2)
            position_ids = torch.arange(past_length, past_length + seq_len, 
                                       device=input_ids.device)
        
        position_ids = position_ids.unsqueeze(0)
        
        # Token嵌入 + 位置嵌入
        token_embeddings = self.token_embedding(input_ids)
        position_embeddings = self.position_embedding(position_ids)
        
        x = self.dropout(token_embeddings + position_embeddings)
        
        # 通过所有层
        presents = []
        for i, layer in enumerate(self.layers):
            past = past_key_values[i] if past_key_values is not None else None
            x, present = layer(x, past=past)
            presents.append(present)
        
        x = self.ln_f(x)
        
        # 输出logits
        logits = self.head(x)
        
        return logits, presents


class GPTBlock(nn.Module):
    """GPT块(仅掩码自注意力 + FFN)"""
    
    def __init__(self, d_model, n_heads, d_ff, dropout):
        super().__init__()
        
        self.ln_1 = nn.LayerNorm(d_model)
        self.attn = CausalSelfAttention(d_model, n_heads, dropout)
        self.ln_2 = nn.LayerNorm(d_model)
        self.mlp = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.GELU(),
            nn.Linear(d_ff, d_model),
            nn.Dropout(dropout)
        )
    
    def forward(self, x, past=None):
        # 掩码自注意力
        attn_output, present = self.attn(self.ln_1(x), past=past)
        x = x + attn_output
        
        # FFN
        x = x + self.mlp(self.ln_2(x))
        
        return x, present


class CausalSelfAttention(nn.Module):
    """因果自注意力(GPT使用)"""
    
    def __init__(self, d_model, n_heads, dropout):
        super().__init__()
        
        assert d_model % n_heads == 0
        
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        
        # Q, K, V投影(合并为一个矩阵加速)
        self.c_attn = nn.Linear(d_model, 3 * d_model)
        
        # 输出投影
        self.c_proj = nn.Linear(d_model, d_model)
        
        self.attn_dropout = nn.Dropout(dropout)
        self.resid_dropout = nn.Dropout(dropout)
        
        # 因果掩码(下三角)
        self.register_buffer(
            "bias",
            torch.tril(torch.ones(1024, 1024)).view(1, 1, 1024, 1024)
        )
    
    def forward(self, x, past=None):
        batch_size, seq_len, d_model = x.shape
        
        # 计算Q, K, V
        qkv = self.c_attn(x)
        q, k, v = qkv.split(self.d_model, dim=2)
        
        # 重塑为多头
        q = q.view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        k = k.view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        v = v.view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        
        # 如果有缓存,拼接
        if past is not None:
            past_k, past_v = past
            k = torch.cat([past_k, k], dim=-2)
            v = torch.cat([past_v, v], dim=-2)
        
        present = (k, v)
        
        # 注意力计算
        attn = (q @ k.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        # 应用因果掩码
        attn = attn.masked_fill(
            self.bias[:, :, :seq_len, :k.size(-2)] == 0,
            float('-inf')
        )
        
        attn = F.softmax(attn, dim=-1)
        attn = self.attn_dropout(attn)
        
        y = attn @ v
        
        # 拼接多头
        y = y.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)
        
        # 输出投影
        y = self.resid_dropout(self.c_proj(y))
        
        return y, present


def demonstrate_gpt_generation():
    """演示GPT的文本生成"""
    
    print("\n实验20:GPT自回归生成过程\n")
    
    print("GPT的核心特点:")
    print("  1. 架构:仅解码器(Decoder-only)")
    print("  2. 训练:自回归语言建模")
    print("     P(x₁, x₂, ..., xₙ) = ∏ P(xᵢ | x₁, ..., xᵢ₋₁)")
    print("  3. 掩码:因果掩码(只能看到左侧上下文)")
    print("  4. 任务:零样本/少样本学习\n")
    
    # 创建小型GPT
    vocab_size = 5000
    gpt = GPTStyleDecoder(
        vocab_size=vocab_size,
        d_model=256,
        n_heads=8,
        n_layers=6,
        d_ff=1024,
        max_len=128
    )
    
    print(f"模型参数量: {sum(p.numel() for p in gpt.parameters()):,}\n")
    
    # 模拟生成过程
    print("生成过程模拟:")
    print("  输入提示: 'The cat sat on'")
    print("  目标: 生成接下来的词\n")
    
    # 模拟输入(假设已tokenize)
    input_ids = torch.tensor([[1, 234, 567, 89, 456]])  # [batch=1, seq_len=5]
    
    # 前向传播
    with torch.no_grad():
        logits, _ = gpt(input_ids)
    
    print(f"输出logits形状: {logits.shape}")
    print(f"  - [batch_size, seq_len, vocab_size]\n")
    
    # 预测下一个词
    next_token_logits = logits[0, -1, :]  # 取最后一个位置
    probs = F.softmax(next_token_logits, dim=-1)
    
    # Top-5预测
    top5_probs, top5_indices = torch.topk(probs, 5)
    
    print("Top-5预测(假设词表):")
    fake_vocab = ['the', 'mat', 'floor', 'table', 'chair']
    for i, (idx, prob) in enumerate(zip(top5_indices, top5_probs)):
        print(f"  {i+1}. '{fake_vocab[i]}' (ID: {idx.item()}, 概率: {prob.item():.4f})")
    
    print("\n生成策略:")
    print("  • Greedy: 选择概率最高的词")
    print("  • Sampling: 按概率分布随机采样")
    print("  • Top-k Sampling: 从概率最高的k个词中采样")
    print("  • Nucleus (Top-p) Sampling: 从累积概率达到p的词中采样\n")
    
    # 可视化因果掩码
    seq_len = 10
    causal_mask = torch.tril(torch.ones(seq_len, seq_len))
    
    plt.figure(figsize=(8, 8))
    plt.imshow(causal_mask.numpy(), cmap='Greys', interpolation='nearest')
    plt.title('GPT的因果注意力掩码', fontsize=14, fontweight='bold')
    plt.xlabel('Key位置(可见的上下文)', fontsize=12)
    plt.ylabel('Query位置(当前生成位置)', fontsize=12)
    
    # 标注
    for i in range(seq_len):
        for j in range(seq_len):
            text = '✓' if causal_mask[i, j] == 1 else '✗'
            color = 'white' if causal_mask[i, j] == 1 else 'black'
            plt.text(j, i, text, ha='center', va='center', 
                    color=color, fontsize=12, fontweight='bold')
    
    plt.tight_layout()
    plt.show()
    
    print("掩码解读:")
    print("  ✓ 白色区域:允许注意")
    print("  ✗ 黑色区域:禁止注意(未来信息)")
    print("  → 确保生成第i个词时,只能看到前i-1个词\n")


demonstrate_gpt_generation()

7.2 掩码语言模型:BERT

class BERTStyleEncoder(nn.Module):
    """BERT风格的仅编码器架构"""
    
    def __init__(
        self,
        vocab_size,
        d_model=768,
        n_heads=12,
        n_layers=12,
        d_ff=3072,
        max_len=512,
        dropout=0.1
    ):
        super().__init__()
        
        # Token嵌入
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        
        # 位置嵌入
        self.position_embedding = nn.Embedding(max_len, d_model)
        
        # Token类型嵌入(用于句子对任务)
        self.token_type_embedding = nn.Embedding(2, d_model)
        
        # Transformer编码器层
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, n_heads, d_ff, dropout)
            for _ in range(n_layers)
        ])
        
        self.ln = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input_ids, token_type_ids=None, attention_mask=None):
        """
        Args:
            input_ids: [batch, seq_len]
            token_type_ids: [batch, seq_len] (0或1,区分句子A/B)
            attention_mask: [batch, seq_len] (1=有效,0=padding)
        
        Returns:
            hidden_states: [batch, seq_len, d_model]
        """
        batch_size, seq_len = input_ids.shape
        
        # 位置索引
        position_ids = torch.arange(0, seq_len, device=input_ids.device)
        position_ids = position_ids.unsqueeze(0).expand_as(input_ids)
        
        # Token类型(默认全0)
        if token_type_ids is None:
            token_type_ids = torch.zeros_like(input_ids)
        
        # 三种嵌入相加
        token_embeddings = self.token_embedding(input_ids)
        position_embeddings = self.position_embedding(position_ids)
        token_type_embeddings = self.token_type_embedding(token_type_ids)
        
        embeddings = token_embeddings + position_embeddings + token_type_embeddings
        embeddings = self.dropout(embeddings)
        
        # 扩展attention_mask
        if attention_mask is not None:
            attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)
            # [batch, 1, 1, seq_len]
        
        # 通过所有编码器层
        hidden_states = embeddings
        for layer in self.layers:
            hidden_states, _ = layer(hidden_states, attention_mask)
        
        hidden_states = self.ln(hidden_states)
        
        return hidden_states


class MLMHead(nn.Module):
    """掩码语言模型头"""
    
    def __init__(self, d_model, vocab_size):
        super().__init__()
        
        self.dense = nn.Linear(d_model, d_model)
        self.layer_norm = nn.LayerNorm(d_model)
        self.decoder = nn.Linear(d_model, vocab_size, bias=False)
        self.bias = nn.Parameter(torch.zeros(vocab_size))
        
        self.decoder.bias = self.bias
        
    def forward(self, hidden_states):
        """
        Args:
            hidden_states: [batch, seq_len, d_model]
        
        Returns:
            logits: [batch, seq_len, vocab_size]
        """
        hidden_states = self.dense(hidden_states)
        hidden_states = F.gelu(hidden_states)
        hidden_states = self.layer_norm(hidden_states)
        
        logits = self.decoder(hidden_states)
        
        return logits


def demonstrate_bert_mlm():
    """演示BERT的掩码语言建模"""
    
    print("\n实验21:BERT掩码语言建模\n")
    
    print("BERT的核心特点:")
    print("  1. 架构:仅编码器(Encoder-only)")
    print("  2. 训练:掩码语言建模(MLM)+ 下一句预测(NSP)")
    print("  3. 掩码:无掩码(双向注意力)")
    print("  4. 任务:特征提取、分类、问答等\n")
    
    # 创建BERT
    vocab_size = 30000
    bert = BERTStyleEncoder(
        vocab_size=vocab_size,
        d_model=768,
        n_heads=12,
        n_layers=12,
        d_ff=3072,
        max_len=512
    )
    
    mlm_head = MLMHead(768, vocab_size)
    
    print(f"BERT参数量: {sum(p.numel() for p in bert.parameters()):,}")
    print(f"MLM头参数量: {sum(p.numel() for p in mlm_head.parameters()):,}\n")
    
    # 模拟MLM任务
    print("MLM训练示例:")
    original_text = "The quick brown fox jumps over the lazy dog"
    masked_text = "The quick [MASK] fox jumps over the [MASK] dog"
    
    print(f"  原始: {original_text}")
    print(f"  掩码: {masked_text}")
    print(f"  目标: 预测 [MASK] 位置的词\n")
    
    # 模拟输入(假设已tokenize)
    # [CLS] The quick [MASK] fox ... [MASK] dog [SEP]
    input_ids = torch.tensor([[
        101,  # [CLS]
        1996, 4248, 103, 4419, 14523, 2058, 1996, 103, 3899, 
        102   # [SEP]
    ]])
    
    # 掩码位置
    masked_positions = [3, 9]  # 索引
    
    # 前向传播
    with torch.no_grad():
        hidden_states = bert(input_ids)
        logits = mlm_head(hidden_states)
    
    print(f"输出logits形状: {logits.shape}\n")
    
    # 预测掩码位置
    print("预测结果:")
    fake_vocab = {103: '[MASK]', 2829: 'brown', 13971: 'lazy'}
    
    for pos in masked_positions:
        masked_logits = logits[0, pos, :]
        probs = F.softmax(masked_logits, dim=-1)
        
        # Top-3预测
        top3_probs, top3_indices = torch.topk(probs, 3)
        
        print(f"\n  位置{pos} ([MASK]):")
        for i, (idx, prob) in enumerate(zip(top3_indices, top3_probs)):
            word = fake_vocab.get(idx.item(), f'word_{idx.item()}')
            print(f"    {i+1}. {word} (概率: {prob.item():.4f})")
    
    print("\n\nMLM训练策略(BERT论文):")
    print("  1. 随机选择15%的token")
    print("  2. 其中:")
    print("     - 80%替换为[MASK]")
    print("     - 10%替换为随机词")
    print("     - 10%保持不变")
    print("  3. 目的:防止模型过度依赖[MASK]标记\n")
    
    # 可视化双向注意力
    seq_len = 10
    bidirectional_mask = torch.ones(seq_len, seq_len)
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
    
    # BERT双向注意力
    im1 = ax1.imshow(bidirectional_mask.numpy(), cmap='Greens', 
                     interpolation='nearest')
    ax1.set_title('BERT双向注意力(无掩码)', fontsize=13, fontweight='bold')
    ax1.set_xlabel('Key位置', fontsize=11)
    ax1.set_ylabel('Query位置', fontsize=11)
    plt.colorbar(im1, ax=ax1)
    
    # GPT因果注意力(对比)
    causal_mask = torch.tril(torch.ones(seq_len, seq_len))
    im2 = ax2.imshow(causal_mask.numpy(), cmap='Reds', 
                     interpolation='nearest')
    ax2.set_title('GPT因果注意力(对比)', fontsize=13, fontweight='bold')
    ax2.set_xlabel('Key位置', fontsize=11)
    ax2.set_ylabel('Query位置', fontsize=11)
    plt.colorbar(im2, ax=ax2)
    
    plt.tight_layout()
    plt.show()
    
    print("BERT vs GPT架构对比:\n")
    
    comparison = {
        '特性': ['架构', '注意力', '训练目标', '典型任务', '推理方式'],
        'BERT': [
            '编码器',
            '双向(看全文)',
            'MLM + NSP',
            '分类、NER、QA',
            '并行(一次处理全文)'
        ],
        'GPT': [
            '解码器',
            '单向(仅看左侧)',
            '自回归LM',
            '生成、对话',
            '自回归(逐词生成)'
        ]
    }
    
    print(f"{'特性':<15} {'BERT':<25} {'GPT':<25}")
    print("="*70)
    
    for i, feature in enumerate(comparison['特性']):
        print(f"{feature:<15} {comparison['BERT'][i]:<25} {comparison['GPT'][i]:<25}")
    
    print("\n适用场景:")
    print("  • BERT:理解任务(分类、抽取、匹配)")
    print("  • GPT:生成任务(文本生成、对话、翻译)\n")


demonstrate_bert_mlm()

👁️ 第八章:Vision Transformer - 计算机视觉的新纪元

8.1 从CNN到Transformer

class PatchEmbedding(nn.Module):
    """图像patch嵌入"""
    
    def __init__(self, img_size=224, patch_size=16, in_channels=3, embed_dim=768):
        super().__init__()
        
        self.img_size = img_size
        self.patch_size = patch_size
        self.n_patches = (img_size // patch_size) ** 2
        
        # 使用卷积实现patch嵌入(等价于线性投影)
        self.proj = nn.Conv2d(
            in_channels,
            embed_dim,
            kernel_size=patch_size,
            stride=patch_size
        )
        
    def forward(self, x):
        """
        Args:
            x: [batch, channels, height, width]
        
        Returns:
            patches: [batch, n_patches, embed_dim]
        """
        # [batch, embed_dim, h/p, w/p]
        x = self.proj(x)
        
        # [batch, embed_dim, n_patches]
        x = x.flatten(2)
        
        # [batch, n_patches, embed_dim]
        x = x.transpose(1, 2)
        
        return x


class VisionTransformer(nn.Module):
    """Vision Transformer (ViT)"""
    
    def __init__(
        self,
        img_size=224,
        patch_size=16,
        in_channels=3,
        num_classes=1000,
        embed_dim=768,
        n_heads=12,
        n_layers=12,
        d_ff=3072,
        dropout=0.1
    ):
        super().__init__()
        
        # Patch嵌入
        self.patch_embed = PatchEmbedding(
            img_size, patch_size, in_channels, embed_dim
        )
        
        n_patches = self.patch_embed.n_patches
        
        # [CLS] token
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        
        # 位置嵌入(包含[CLS])
        self.pos_embed = nn.Parameter(torch.zeros(1, n_patches + 1, embed_dim))
        
        # Transformer编码器
        self.encoder_layers = nn.ModuleList([
            EncoderLayer(embed_dim, n_heads, d_ff, dropout)
            for _ in range(n_layers)
        ])
        
        self.norm = nn.LayerNorm(embed_dim)
        
        # 分类头
        self.head = nn.Linear(embed_dim, num_classes)
        
        self.dropout = nn.Dropout(dropout)
        
        # 初始化
        nn.init.trunc_normal_(self.pos_embed, std=0.02)
        nn.init.trunc_normal_(self.cls_token, std=0.02)
        
    def forward(self, x):
        """
        Args:
            x: [batch, channels, height, width]
        
        Returns:
            logits: [batch, num_classes]
        """
        batch_size = x.shape[0]
        
        # Patch嵌入
        x = self.patch_embed(x)  # [batch, n_patches, embed_dim]
        
        # 添加[CLS] token
        cls_tokens = self.cls_token.expand(batch_size, -1, -1)
        x = torch.cat([cls_tokens, x], dim=1)
        
        # 添加位置嵌入
        x = x + self.pos_embed
        x = self.dropout(x)
        
        # 通过Transformer编码器
        for layer in self.encoder_layers:
            x, _ = layer(x)
        
        x = self.norm(x)
        
        # 使用[CLS] token进行分类
        cls_output = x[:, 0]
        logits = self.head(cls_output)
        
        return logits


def demonstrate_vit():
    """演示Vision Transformer"""
    
    print("\n实验22:Vision Transformer原理\n")
    
    print("ViT核心思想:将图像视为序列")
    print("  1. 将图像分割成固定大小的patches")
    print("  2. 线性投影每个patch到嵌入空间")
    print("  3. 添加位置嵌入")
    print("  4. 通过标准Transformer编码器")
    print("  5. 使用[CLS] token进行分类\n")
    
    # 创建ViT
    model = VisionTransformer(
        img_size=224,
        patch_size=16,
        in_channels=3,
        num_classes=1000,
        embed_dim=768,
        n_heads=12,
        n_layers=12,
        d_ff=3072
    )
    
    print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}\n")
    
    # 模拟输入
    batch_size = 4
    img = torch.randn(batch_size, 3, 224, 224)
    
    print(f"输入图像: {img.shape}")
    print(f"  - [batch_size, channels, height, width]\n")
    
    # 前向传播
    with torch.no_grad():
        # Patch嵌入
        patches = model.patch_embed(img)
        print(f"Patch嵌入: {patches.shape}")
        
        n_patches = model.patch_embed.n_patches
        patch_size = model.patch_embed.patch_size
        patches_per_side = int(n_patches ** 0.5)
        
        print(f"  - Patch大小: {patch_size}×{patch_size}")
        print(f"  - 每边Patch数: {patches_per_side}")
        print(f"  - 总Patch数: {n_patches}")
        print(f"  - 嵌入维度: {patches.shape[-1]}\n")
        
        # 完整前向
        logits = model(img)
        print(f"输出logits: {logits.shape}")
        print(f"  - [batch_size, num_classes]\n")
    
    # 可视化patch分割
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # 原始图像(模拟)
    sample_img = torch.randn(3, 224, 224)
    sample_img = (sample_img - sample_img.min()) / (sample_img.max() - sample_img.min())
    
    axes[0].imshow(sample_img.permute(1, 2, 0).numpy())
    axes[0].set_title('原始图像 (224×224)', fontsize=13, fontweight='bold')
    axes[0].axis('off')
    
    # Patch网格
    axes[1].imshow(sample_img.permute(1, 2, 0).numpy())
    
    # 绘制网格线
    for i in range(0, 224, patch_size):
        axes[1].axhline(y=i, color='red', linewidth=2)
        axes[1].axvline(x=i, color='red', linewidth=2)
    
    axes[1].set_title(f'Patch分割 ({patches_per_side}×{patches_per_side})', 
                     fontsize=13, fontweight='bold')
    axes[1].axis('off')
    
    # Patch序列可视化
    with torch.no_grad():
        single_patch_embed = model.patch_embed(sample_img.unsqueeze(0))
        patch_matrix = single_patch_embed[0].numpy()  # [196, 768]
    
    axes[2].imshow(patch_matrix.T, aspect='auto', cmap='viridis')
    axes[2].set_title(f'Patch嵌入矩阵 ({n_patches}×{patches.shape[-1]})', 
                     fontsize=13, fontweight='bold')
    axes[2].set_xlabel('Patch索引')
    axes[2].set_ylabel('嵌入维度')
    
    plt.tight_layout()
    plt.show()
    
    print("ViT vs CNN对比:\n")
    
    comparison = {
        '特性': ['归纳偏置', '参数共享', '局部性', '全局建模', '数据需求', '可解释性'],
        'CNN': [
            '强(卷积+池化)',
            '高(卷积核共享)',
            '天然(感受野)',
            '需要深层堆叠',
            '相对较少',
            '较好(可视化滤波器)'
        ],
        'ViT': [
            '弱(仅位置编码)',
            '低(每层独立参数)',
            '需要学习',
            '天然(自注意力)',
            '大量(ImageNet-21K)',
            '一般(注意力图)'
        ]
    }
    
    print(f"{'特性':<15} {'CNN':<30} {'ViT':<30}")
    print("="*80)
    
    for i, feature in enumerate(comparison['特性']):
        print(f"{feature:<15} {comparison['CNN'][i]:<30} {comparison['ViT'][i]:<30}")
    
    print("\n关键发现(ViT论文):")
    print("  ✓ 小数据集:CNN表现更好(归纳偏置有利)")
    print("  ✓ 大数据集:ViT超越CNN(学习更通用的特征)")
    print("  ✓ 混合架构:早期卷积 + 后期Transformer效果最佳\n")


demonstrate_vit()

8.2 注意力图可视化

def visualize_vit_attention():
    """可视化ViT的注意力图"""
    
    print("\n实验23:ViT注意力图可视化\n")
    
    # 创建简化的ViT
    model = VisionTransformer(
        img_size=224,
        patch_size=16,
        in_channels=3,
        num_classes=10,  # 简化
        embed_dim=256,
        n_heads=8,
        n_layers=6
    )
    
    model.eval()
    
    # 模拟输入(单张图片)
    img = torch.randn(1, 3, 224, 224)
    
    # 修改模型以返回注意力权重
    attentions = []
    
    def hook_fn(module, input, output):
        # output[1] 是注意力权重
        attentions.append(output[1].detach())
    
    # 注册hook
    hooks = []
    for layer in model.encoder_layers:
        hook = layer.self_attention.register_forward_hook(hook_fn)
        hooks.append(hook)
    
    # 前向传播
    with torch.no_grad():
        _ = model(img)
    
    # 移除hook
    for hook in hooks:
        hook.remove()
    
    # 可视化不同层的注意力
    n_layers = len(attentions)
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()
    
    n_patches = model.patch_embed.n_patches
    patches_per_side = int(n_patches ** 0.5)
    
    for layer_idx in range(min(6, n_layers)):
        ax = axes[layer_idx]
        
        # 获取该层的注意力 [1, n_heads, seq_len, seq_len]
        attn = attentions[layer_idx][0]  # [n_heads, seq_len, seq_len]
        
        # 平均所有头
        attn_avg = attn.mean(dim=0)  # [seq_len, seq_len]
        
        # 关注[CLS] token对patches的注意力
        cls_attn = attn_avg[0, 1:]  # 去掉[CLS]自身
        
        # Reshape到2D
        cls_attn_2d = cls_attn.reshape(patches_per_side, patches_per_side)
        
        im = ax.imshow(cls_attn_2d.numpy(), cmap='hot', interpolation='nearest')
        ax.set_title(f'Layer {layer_idx + 1}', fontsize=12, fontweight='bold')
        ax.axis('off')
        plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
    
    plt.suptitle('[CLS] Token的注意力分布(不同层)', 
                fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()
    
    print("观察:")
    print("  • 早期层:注意力分散(学习局部特征)")
    print("  • 中间层:注意力聚焦到特定区域")
    print("  • 后期层:注意力高度集中(语义相关区域)\n")
    
    # 分析注意力距离
    print("注意力距离分析:\n")
    
    for layer_idx in range(n_layers):
        attn = attentions[layer_idx][0]  # [n_heads, seq_len, seq_len]
        
        # 计算平均注意力距离
        distances = []
        
        for head in range(attn.shape[0]):
            head_attn = attn[head, 1:, 1:]  # 去掉[CLS]
            
            # 计算每个patch关注的平均位置
            for i in range(n_patches):
                weights = head_attn[i]
                
                # 计算加权平均距离
                i_row, i_col = i // patches_per_side, i % patches_per_side
                
                total_distance = 0
                for j, w in enumerate(weights):
                    j_row, j_col = j // patches_per_side, j % patches_per_side
                    dist = abs(i_row - j_row) + abs(i_col - j_col)  # Manhattan距离
                    total_distance += w.item() * dist
                
                distances.append(total_distance)
        
        avg_distance = np.mean(distances)
        print(f"  Layer {layer_idx + 1}: 平均注意力距离 = {avg_distance:.2f} patches")
    
    print("\n结论:")
    print("  ✓ 深层网络学习更全局的特征(注意力距离增大)")
    print("  ✓ 不同头关注不同尺度的特征\n")


visualize_vit_attention()

继续第九章...


⚡ 第九章:高效Transformer - 稀疏注意力与线性复杂度

9.1 标准注意力的瓶颈

def analyze_attention_complexity():
    """分析标准注意力的复杂度瓶颈"""
    
    print("\n实验24:标准注意力的计算与内存瓶颈\n")
    
    print("标准自注意力:")
    print("  Attention(Q, K, V) = softmax(QK^T / √d_k) V\n")
    
    print("复杂度分析:")
    print("  • 时间复杂度: O(n² · d)")
    print("    - QK^T: O(n² · d)")
    print("    - softmax(...)V: O(n² · d)")
    print("  • 空间复杂度: O(n²)")
    print("    - 注意力矩阵: [n, n]\n")
    
    print("问题:序列长度n增长时,成本呈平方增长!\n")
    
    # 可视化不同序列长度的成本
    seq_lengths = [512, 1024, 2048, 4096, 8192, 16384]
    d_model = 512
    
    time_costs = []
    memory_costs = []
    
    for n in seq_lengths:
        # 时间(FLOPs)
        time = 2 * n**2 * d_model / 1e9  # GFLOPs
        time_costs.append(time)
        
        # 内存(注意力矩阵)
        memory = n**2 * 4 / 1024**2  # MB (float32)
        memory_costs.append(memory)
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
    
    # 计算成本
    ax1.plot(seq_lengths, time_costs, marker='o', linewidth=2.5, 
            color='red', markersize=8)
    ax1.set_xlabel('序列长度', fontsize=12)
    ax1.set_ylabel('计算量 (GFLOPs)', fontsize=12)
    ax1.set_title('计算成本随序列长度变化', fontsize=14, fontweight='bold')
    ax1.grid(True, alpha=0.3)
    ax1.set_xscale('log', base=2)
    ax1.set_yscale('log')
    
    # 标注关键点
    for x, y in zip(seq_lengths[::2], time_costs[::2]):
        ax1.annotate(f'{y:.1f}', xy=(x, y), xytext=(10, 10),
                    textcoords='offset points', fontsize=10,
                    bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.5))
    
    # 内存成本
    ax2.plot(seq_lengths, memory_costs, marker='s', linewidth=2.5,
            color='blue', markersize=8)
    ax2.set_xlabel('序列长度', fontsize=12)
    ax2.set_ylabel('内存占用 (MB)', fontsize=12)
    ax2.set_title('内存占用随序列长度变化', fontsize=14, fontweight='bold')
    ax2.grid(True, alpha=0.3)
    ax2.set_xscale('log', base=2)
    ax2.set_yscale('log')
    
    # 标注GPU内存限制
    ax2.axhline(y=16*1024, color='red', linestyle='--', linewidth=2,
               label='GPU内存限制 (16GB)')
    ax2.legend(fontsize=11)
    
    for x, y in zip(seq_lengths[::2], memory_costs[::2]):
        ax2.annotate(f'{y:.0f}', xy=(x, y), xytext=(10, 10),
                    textcoords='offset points', fontsize=10,
                    bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.5))
    
    plt.tight_layout()
    plt.show()
    
    print("实际限制:")
    print(f"  • seq_len=2048: 内存 {memory_costs[2]:.0f} MB")
    print(f"  • seq_len=8192: 内存 {memory_costs[4]:.0f} MB")
    print(f"  • seq_len=16384: 内存 {memory_costs[5]:.0f} MB (超出单GPU容量!)\n")
    
    print("应用场景需求:")
    print("  • 长文档理解: 10k+ tokens")
    print("  • 蛋白质序列: 数千残基")
    print("  • 高分辨率图像: 数万patches")
    print("  • 长视频: 数万帧\n")
    
    print("解决方案:")
    print("  1. 稀疏注意力")
    print("  2. 线性注意力")
    print("  3. 分块/滑动窗口注意力")
    print("  4. Flash Attention (硬件优化)\n")


analyze_attention_complexity()

9.2 稀疏注意力机制

class SparseAttentionPatterns:
    """稀疏注意力模式"""
    
    def __init__(self):
        pass
    
    def local_attention(self, seq_len, window_size):
        """局部注意力(滑动窗口)"""
        mask = torch.zeros(seq_len, seq_len)
        
        for i in range(seq_len):
            start = max(0, i - window_size // 2)
            end = min(seq_len, i + window_size // 2 + 1)
            mask[i, start:end] = 1
        
        return mask
    
    def strided_attention(self, seq_len, stride):
        """步幅注意力"""
        mask = torch.zeros(seq_len, seq_len)
        
        for i in range(seq_len):
            # 每隔stride个位置
            indices = list(range(0, seq_len, stride))
            if i not in indices:
                indices.append(i)
            mask[i, indices] = 1
        
        return mask
    
    def block_sparse_attention(self, seq_len, block_size):
        """块稀疏注意力"""
        n_blocks = seq_len // block_size
        mask = torch.zeros(seq_len, seq_len)
        
        for i in range(n_blocks):
            for j in range(n_blocks):
                # 对角块和相邻块
                if abs(i - j) <= 1:
                    start_i = i * block_size
                    end_i = (i + 1) * block_size
                    start_j = j * block_size
                    end_j = (j + 1) * block_size
                    
                    mask[start_i:end_i, start_j:end_j] = 1
        
        return mask
    
    def visualize_patterns(self):
        """可视化不同稀疏模式"""
        
        print("\n实验25:稀疏注意力模式\n")
        
        seq_len = 64
        
        # 生成不同模式
        full_mask = torch.ones(seq_len, seq_len)
        local_mask = self.local_attention(seq_len, window_size=8)
        strided_mask = self.strided_attention(seq_len, stride=4)
        block_mask = self.block_sparse_attention(seq_len, block_size=8)
        
        # 计算稀疏度
        def sparsity(mask):
            return (1 - mask.sum() / mask.numel()) * 100
        
        fig, axes = plt.subplots(2, 2, figsize=(12, 12))
        
        patterns = [
            ('全注意力 (标准)', full_mask),
            ('局部注意力 (窗口=8)', local_mask),
            ('步幅注意力 (步长=4)', strided_mask),
            ('块稀疏注意力 (块=8)', block_mask)
        ]
        
        for ax, (title, mask) in zip(axes.flatten(), patterns):
            im = ax.imshow(mask.numpy(), cmap='Greys', interpolation='nearest')
            ax.set_title(f'{title}\n稀疏度: {sparsity(mask):.1f}%', 
                        fontsize=12, fontweight='bold')
            ax.set_xlabel('Key位置')
            ax.set_ylabel('Query位置')
            plt.colorbar(im, ax=ax)
        
        plt.tight_layout()
        plt.show()
        
        # 复杂度分析
        print("复杂度对比:\n")
        
        print(f"{'模式':<20} {'时间复杂度':<20} {'空间复杂度':<20} {'稀疏度':<15}")
        print("="*80)
        
        complexities = [
            ('全注意力', 'O(n²·d)', 'O(n²)', f'{sparsity(full_mask):.1f}%'),
            ('局部注意力', 'O(n·w·d)', 'O(n·w)', f'{sparsity(local_mask):.1f}%'),
            ('步幅注意力', 'O(n·n/s·d)', 'O(n·n/s)', f'{sparsity(strided_mask):.1f}%'),
            ('块稀疏', 'O(n·b·d)', 'O(n·b)', f'{sparsity(block_mask):.1f}%')
        ]
        
        for name, time, space, sparse in complexities:
            print(f"{name:<20} {time:<20} {space:<20} {sparse:<15}")
        
        print("\n图例:")
        print("  n: 序列长度")
        print("  d: 特征维度")
        print("  w: 窗口大小")
        print("  s: 步幅")
        print("  b: 块大小\n")


class LongformerAttention(nn.Module):
    """Longformer的注意力机制(局部+全局)"""
    
    def __init__(self, d_model, n_heads, window_size, n_global_tokens=1):
        super().__init__()
        
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.window_size = window_size
        self.n_global_tokens = n_global_tokens
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
    
    def forward(self, x):
        """
        Args:
            x: [batch, seq_len, d_model]
        
        Returns:
            output: [batch, seq_len, d_model]
        """
        batch_size, seq_len, _ = x.shape
        
        # Q, K, V投影
        Q = self.W_q(x).view(batch_size, seq_len, self.n_heads, self.d_k)
        K = self.W_k(x).view(batch_size, seq_len, self.n_heads, self.d_k)
        V = self.W_v(x).view(batch_size, seq_len, self.n_heads, self.d_k)
        
        Q = Q.transpose(1, 2)  # [batch, n_heads, seq_len, d_k]
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)
        
        # 简化实现:仅演示局部窗口
        # 实际Longformer使用更高效的滑动窗口实现
        
        output = torch.zeros_like(Q)
        
        for i in range(seq_len):
            # 局部窗口
            start = max(0, i - self.window_size // 2)
            end = min(seq_len, i + self.window_size // 2 + 1)
            
            # 全局tokens(如[CLS])
            if i < self.n_global_tokens:
                # 全局token关注所有位置
                k_window = K
                v_window = V
            else:
                k_window = K[:, :, start:end, :]
                v_window = V[:, :, start:end, :]
            
            q_i = Q[:, :, i:i+1, :]
            
            # 注意力计算
            scores = torch.matmul(q_i, k_window.transpose(-2, -1)) / math.sqrt(self.d_k)
            attn = F.softmax(scores, dim=-1)
            
            output[:, :, i:i+1, :] = torch.matmul(attn, v_window)
        
        output = output.transpose(1, 2).contiguous()
        output = output.view(batch_size, seq_len, self.d_model)
        
        output = self.W_o(output)
        
        return output


sparse_patterns = SparseAttentionPatterns()
sparse_patterns.visualize_patterns()

9.3 线性注意力

class LinearAttention(nn.Module):
    """线性注意力(Performer风格)"""
    
    def __init__(self, d_model, n_heads, n_features=256):
        super().__init__()
        
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.n_features = n_features
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def kernel_feature_map(self, x):
        """特征映射函数(简化的随机特征)"""
        # φ(x) = exp(x²/2) / √d_k
        # 实际Performer使用更复杂的正交随机特征
        
        return F.elu(x) + 1  # 简化版本,确保非负
    
    def forward(self, x):
        """
        线性注意力核心思想:
        Attention(Q, K, V) = φ(Q) · (φ(K)^T · V) / (φ(Q) · φ(K)^T · 1)
        
        复杂度:O(n · d²) 而非 O(n² · d)
        """
        batch_size, seq_len, _ = x.shape
        
        Q = self.W_q(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        K = self.W_k(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        V = self.W_v(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        
        # 应用特征映射
        Q_prime = self.kernel_feature_map(Q)  # [batch, n_heads, seq_len, d_k]
        K_prime = self.kernel_feature_map(K)
        
        # 关键:改变计算顺序
        # 标准: (Q @ K^T) @ V  ->  O(n²d)
        # 线性: Q @ (K^T @ V)  ->  O(nd²)
        
        # K^T @ V: [batch, n_heads, d_k, d_k]
        KV = torch.matmul(K_prime.transpose(-2, -1), V)
        
        # Q @ KV: [batch, n_heads, seq_len, d_k]
        QKV = torch.matmul(Q_prime, KV)
        
        # 归一化
        K_sum = K_prime.sum(dim=-2, keepdim=True).transpose(-2, -1)  # [batch, n_heads, d_k, 1]
        normalizer = torch.matmul(Q_prime, K_sum)  # [batch, n_heads, seq_len, 1]
        
        output = QKV / (normalizer + 1e-6)
        
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        output = self.W_o(output)
        
        return output


def compare_attention_mechanisms():
    """对比不同注意力机制"""
    
    print("\n实验26:高效注意力机制对比\n")
    
    d_model = 512
    n_heads = 8
    batch_size = 4
    
    # 测试不同序列长度
    seq_lengths = [128, 256, 512, 1024, 2048, 4096]
    
    # 创建模型
    standard_attn = MultiHeadAttention(d_model, n_heads)
    linear_attn = LinearAttention(d_model, n_heads)
    longformer_attn = LongformerAttention(d_model, n_heads, window_size=256)
    
    import time
    
    standard_times = []
    linear_times = []
    longformer_times = []
    
    print("性能测试进行中...\n")
    
    for seq_len in seq_lengths:
        x = torch.randn(batch_size, seq_len, d_model)
        
        # 标准注意力
        start = time.time()
        with torch.no_grad():
            for _ in range(10):
                _ = standard_attn(x, x, x)
        standard_time = (time.time() - start) / 10
        standard_times.append(standard_time * 1000)
        
        # 线性注意力
        start = time.time()
        with torch.no_grad():
            for _ in range(10):
                _ = linear_attn(x)
        linear_time = (time.time() - start) / 10
        linear_times.append(linear_time * 1000)
        
        # Longformer(仅在seq_len不太长时测试)
        if seq_len <= 1024:
            start = time.time()
            with torch.no_grad():
                for _ in range(10):
                    _ = longformer_attn(x)
            longformer_time = (time.time() - start) / 10
            longformer_times.append(longformer_time * 1000)
        else:
            longformer_times.append(None)
    
    # 可视化
    fig, ax = plt.subplots(figsize=(12, 6))
    
    ax.plot(seq_lengths, standard_times, marker='o', linewidth=2.5,
           label='标准注意力 O(n²d)', color='red', markersize=8)
    ax.plot(seq_lengths, linear_times, marker='s', linewidth=2.5,
           label='线性注意力 O(nd²)', color='blue', markersize=8)
    
    # Longformer(部分数据)
    valid_longformer = [(sl, t) for sl, t in zip(seq_lengths, longformer_times) if t is not None]
    if valid_longformer:
        sl_long, t_long = zip(*valid_longformer)
        ax.plot(sl_long, t_long, marker='^', linewidth=2.5,
               label='Longformer O(nwd)', color='green', markersize=8)
    
    ax.set_xlabel('序列长度', fontsize=12)
    ax.set_ylabel('推理时间 (ms)', fontsize=12)
    ax.set_title('不同注意力机制的性能对比', fontsize=14, fontweight='bold')
    ax.legend(fontsize=11)
    ax.grid(True, alpha=0.3)
    ax.set_xscale('log', base=2)
    ax.set_yscale('log')
    
    plt.tight_layout()
    plt.show()
    
    # 打印详细结果
    print("详细性能数据:\n")
    print(f"{'序列长度':<12} {'标准注意力':<15} {'线性注意力':<15} {'Longformer':<15}")
    print("="*60)
    
    for i, sl in enumerate(seq_lengths):
        longformer_str = f"{longformer_times[i]:.2f} ms" if longformer_times[i] else "N/A"
        print(f"{sl:<12} {standard_times[i]:<15.2f} {linear_times[i]:<15.2f} {longformer_str:<15}")
    
    print("\n优势分析:")
    print("  • 标准注意力:seq_len < 512时最快(高度优化)")
    print("  • 线性注意力:seq_len > 2048时显著更快")
    print("  • Longformer:平衡性能与准确性\n")
    
    # 内存对比
    print("内存占用对比(seq_len=4096):\n")
    
    seq_len = 4096
    
    standard_mem = batch_size * n_heads * seq_len * seq_len * 4 / 1024**2
    linear_mem = batch_size * n_heads * d_model // n_heads * d_model // n_heads * 4 / 1024**2
    longformer_mem = batch_size * n_heads * seq_len * 256 * 4 / 1024**2
    
    print(f"  标准注意力: {standard_mem:.2f} MB")
    print(f"  线性注意力: {linear_mem:.2f} MB")
    print(f"  Longformer:  {longformer_mem:.2f} MB")
    print(f"\n  节省内存: {(1 - linear_mem/standard_mem)*100:.1f}% (线性)")
    print(f"            {(1 - longformer_mem/standard_mem)*100:.1f}% (Longformer)\n")


compare_attention_mechanisms()

🎨 第十章:Transformer在多模态中的应用

10.1 CLIP:连接视觉与语言

class CLIPModel(nn.Module):
    """CLIP: Contrastive Language-Image Pre-training"""
    
    def __init__(
        self,
        # 图像编码器参数
        img_size=224,
        patch_size=16,
        img_embed_dim=768,
        img_n_layers=12,
        img_n_heads=12,
        # 文本编码器参数
        vocab_size=49408,
        text_embed_dim=512,
        text_n_layers=12,
        text_n_heads=8,
        max_text_len=77,
        # 共同参数
        projection_dim=512
    ):
        super().__init__()
        
        # 图像编码器(ViT)
        self.image_encoder = VisionTransformer(
            img_size=img_size,
            patch_size=patch_size,
            in_channels=3,
            num_classes=projection_dim,  # 投影维度
            embed_dim=img_embed_dim,
            n_heads=img_n_heads,
            n_layers=img_n_layers
        )
        
        # 文本编码器(Transformer)
        self.text_encoder = BERTStyleEncoder(
            vocab_size=vocab_size,
            d_model=text_embed_dim,
            n_heads=text_n_heads,
            n_layers=text_n_layers,
            max_len=max_text_len
        )
        
        # 投影层
        self.text_projection = nn.Linear(text_embed_dim, projection_dim)
        
        # 可学习的温度参数
        self.logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07))
        
    def encode_image(self, images):
        """
        Args:
            images: [batch, 3, H, W]
        
        Returns:
            image_features: [batch, projection_dim]
        """
        image_features = self.image_encoder(images)
        
        # L2归一化
        image_features = F.normalize(image_features, p=2, dim=-1)
        
        return image_features
    
    def encode_text(self, text_tokens):
        """
        Args:
            text_tokens: [batch, max_len]
        
        Returns:
            text_features: [batch, projection_dim]
        """
        # 通过文本编码器
        hidden_states = self.text_encoder(text_tokens)
        
        # 使用[CLS] token
        text_features = hidden_states[:, 0, :]
        
        # 投影
        text_features = self.text_projection(text_features)
        
        # L2归一化
        text_features = F.normalize(text_features, p=2, dim=-1)
        
        return text_features
    
    def forward(self, images, text_tokens):
        """
        对比学习前向传播
        
        Args:
            images: [batch, 3, H, W]
            text_tokens: [batch, max_len]
        
        Returns:
            logits_per_image: [batch, batch]
            logits_per_text: [batch, batch]
        """
        # 编码
        image_features = self.encode_image(images)
        text_features = self.encode_text(text_tokens)
        
        # 计算相似度矩阵
        # [batch, batch]
        logit_scale = self.logit_scale.exp()
        logits_per_image = logit_scale * image_features @ text_features.t()
        logits_per_text = logits_per_image.t()
        
        return logits_per_image, logits_per_text


def demonstrate_clip():
    """演示CLIP的工作原理"""
    
    print("\n实验27:CLIP多模态学习\n")
    
    print("CLIP核心思想:")
    print("  通过对比学习连接图像和文本")
    print("  正样本:匹配的图像-文本对")
    print("  负样本:不匹配的图像-文本对\n")
    
    # 创建CLIP模型
    clip = CLIPModel(
        img_embed_dim=512,
        img_n_layers=6,
        text_embed_dim=512,
        text_n_layers=6,
        projection_dim=256
    )
    
    print(f"图像编码器参数: {sum(p.numel() for p in clip.image_encoder.parameters()):,}")
    print(f"文本编码器参数: {sum(p.numel() for p in clip.text_encoder.parameters()):,}")
    print(f"总参数量: {sum(p.numel() for p in clip.parameters()):,}\n")
    
    # 模拟批次
    batch_size = 4
    images = torch.randn(batch_size, 3, 224, 224)
    text_tokens = torch.randint(0, 49408, (batch_size, 77))
    
    # 前向传播
    with torch.no_grad():
        logits_per_image, logits_per_text = clip(images, text_tokens)
    
    print(f"图像-文本相似度矩阵: {logits_per_image.shape}\n")
    
    # 可视化相似度矩阵
    similarities = logits_per_image.detach().numpy()
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
    
    # 相似度热图
    im1 = ax1.imshow(similarities, cmap='RdYlGn', aspect='auto')
    ax1.set_title('图像-文本相似度矩阵', fontsize=13, fontweight='bold')
    ax1.set_xlabel('文本索引', fontsize=11)
    ax1.set_ylabel('图像索引', fontsize=11)
    ax1.set_xticks(range(batch_size))
    ax1.set_yticks(range(batch_size))
    
    # 标注数值
    for i in range(batch_size):
        for j in range(batch_size):
            text = f'{similarities[i, j]:.2f}'
            color = 'white' if abs(similarities[i, j]) < 5 else 'black'
            ax1.text(j, i, text, ha='center', va='center', 
                    color=color, fontsize=10, fontweight='bold')
    
    plt.colorbar(im1, ax=ax1)
    
    # Softmax后的概率
    probs = F.softmax(torch.tensor(similarities), dim=1).numpy()
    
    im2 = ax2.imshow(probs, cmap='Blues', aspect='auto')
    ax2.set_title('图像→文本匹配概率', fontsize=13, fontweight='bold')
    ax2.set_xlabel('文本索引', fontsize=11)
    ax2.set_ylabel('图像索引', fontsize=11)
    ax2.set_xticks(range(batch_size))
    ax2.set_yticks(range(batch_size))
    
    for i in range(batch_size):
        for j in range(batch_size):
            text = f'{probs[i, j]:.2f}'
            color = 'white' if probs[i, j] < 0.5 else 'black'
            ax2.text(j, i, text, ha='center', va='center',
                    color=color, fontsize=10, fontweight='bold')
    
    plt.colorbar(im2, ax=ax2)
    plt.tight_layout()
    plt.show()
    
    print("对比损失(InfoNCE):")
    print("  L = -log( exp(sim(i,i)/τ) / Σⱼ exp(sim(i,j)/τ) )")
    print("  其中:")
    print("    i: 正样本对")
    print("    j: 所有样本")
    print("    τ: 温度参数\n")
    
    # 计算损失
    labels = torch.arange(batch_size)
    loss_img = F.cross_entropy(logits_per_image, labels)
    loss_txt = F.cross_entropy(logits_per_text, labels)
    total_loss = (loss_img + loss_txt) / 2
    
    print(f"图像→文本损失: {loss_img.item():.4f}")
    print(f"文本→图像损失: {loss_txt.item():.4f}")
    print(f"总损失: {total_loss.item():.4f}\n")
    
    print("CLIP的应用:")
    print("  1. 零样本图像分类")
    print("     - 将类别名转换为文本嵌入")
    print("     - 计算图像与所有类别的相似度")
    print("  2. 图像检索")
    print("     - 文本查询 → 检索相似图像")
    print("  3. 视觉问答")
    print("  4. 图像生成引导(DALL-E 2)\n")


demonstrate_clip()

10.2 多模态Transformer:Flamingo

class CrossAttentionLayer(nn.Module):
    """交叉注意力层(视觉→语言)"""
    
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        
        self.cross_attention = MultiHeadAttention(d_model, n_heads, dropout)
        self.norm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text_hidden, vision_hidden, mask=None):
        """
        Args:
            text_hidden: [batch, text_len, d_model] (Query)
            vision_hidden: [batch, vision_len, d_model] (Key, Value)
            mask: 注意力掩码
        
        Returns:
            output: [batch, text_len, d_model]
        """
        # 交叉注意力:文本关注视觉特征
        attn_output, attn_weights = self.cross_attention(
            text_hidden,      # Q from text
            vision_hidden,    # K from vision
            vision_hidden,    # V from vision
            mask
        )
        
        # 残差连接 + 层归一化
        output = self.norm(text_hidden + self.dropout(attn_output))
        
        return output, attn_weights


class MultimodalTransformerBlock(nn.Module):
    """多模态Transformer块"""
    
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        
        # 自注意力(文本)
        self.self_attention = MultiHeadAttention(d_model, n_heads, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        
        # 交叉注意力(视觉→文本)
        self.cross_attention_layer = CrossAttentionLayer(d_model, n_heads, dropout)
        
        # FFN
        self.ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.norm2 = nn.LayerNorm(d_model)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text_hidden, vision_hidden, self_attn_mask=None, cross_attn_mask=None):
        """
        Args:
            text_hidden: [batch, text_len, d_model]
            vision_hidden: [batch, vision_len, d_model]
        
        Returns:
            output: [batch, text_len, d_model]
        """
        # 1. 自注意力(文本内部)
        self_attn_out, _ = self.self_attention(text_hidden, text_hidden, text_hidden, self_attn_mask)
        text_hidden = self.norm1(text_hidden + self.dropout(self_attn_out))
        
        # 2. 交叉注意力(视觉→文本)
        cross_attn_out, _ = self.cross_attention_layer(text_hidden, vision_hidden, cross_attn_mask)
        
        # 3. FFN
        ffn_out = self.ffn(cross_attn_out)
        output = self.norm2(cross_attn_out + self.dropout(ffn_out))
        
        return output


def demonstrate_multimodal_transformer():
    """演示多模态Transformer"""
    
    print("\n实验28:多模态Transformer架构\n")
    
    print("Flamingo架构特点:")
    print("  1. 冻结的视觉编码器(预训练ViT)")
    print("  2. 冻结的语言模型(预训练GPT)")
    print("  3. 交叉注意力层(连接视觉和语言)")
    print("  4. 仅训练交叉注意力参数\n")
    
    # 参数
    d_model = 512
    n_heads = 8
    batch_size = 2
    text_len = 20
    vision_len = 196  # 14×14 patches
    
    # 创建模块
    multimodal_block = MultimodalTransformerBlock(
        d_model=d_model,
        n_heads=n_heads,
        d_ff=2048
    )
    
    # 模拟输入
    text_hidden = torch.randn(batch_size, text_len, d_model)
    vision_hidden = torch.randn(batch_size, vision_len, d_model)
    
    # 前向传播
    with torch.no_grad():
        output = multimodal_block(text_hidden, vision_hidden)
    
    print(f"文本输入: {text_hidden.shape}")
    print(f"视觉输入: {vision_hidden.shape}")
    print(f"输出: {output.shape}\n")
    
    # 可视化交叉注意力
    cross_attn_layer = CrossAttentionLayer(d_model, n_heads)
    
    with torch.no_grad():
        _, cross_attn_weights = cross_attn_layer(text_hidden, vision_hidden)
    
    # 平均所有头
    avg_cross_attn = cross_attn_weights[0].mean(dim=0).numpy()  # [text_len, vision_len]
    
    # Reshape视觉维度到2D
    patches_per_side = int(vision_len ** 0.5)
    
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()
    
    # 选择几个文本位置
    text_positions = [0, 5, 10, 15, 19]
    text_labels = ['[IMG]', 'What', 'is', 'in', '?']
    
    for i, (pos, label) in enumerate(zip(text_positions, text_labels)):
        ax = axes[i]
        
        # 该文本位置对所有视觉patch的注意力
        attn_to_vision = avg_cross_attn[pos].reshape(patches_per_side, patches_per_side)
        
        im = ax.imshow(attn_to_vision, cmap='hot', interpolation='nearest')
        ax.set_title(f'文本位置 {pos}: "{label}"', fontsize=12, fontweight='bold')
        ax.axis('off')
        plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
    
    # 最后一个子图:平均注意力
    avg_attn_all = avg_cross_attn.mean(axis=0).reshape(patches_per_side, patches_per_side)
    im = axes[5].imshow(avg_attn_all, cmap='hot', interpolation='nearest')
    axes[5].set_title('平均注意力分布', fontsize=12, fontweight='bold')
    axes[5].axis('off')
    plt.colorbar(im, ax=axes[5], fraction=0.046, pad=0.04)
    
    plt.suptitle('交叉注意力:文本如何关注图像', fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()
    
    print("观察:")
    print("  • '[IMG]' token:平均关注整个图像")
    print("  • 'What', 'is' 等词:关注特定区域")
    print("  • 模型学会将文本与相关视觉区域对齐\n")
    
    print("Flamingo的Few-shot学习:")
    print("  输入格式:")
    print("    <image1> Caption: A cat. <image2> Caption: A dog.")
    print("    <image3> Caption: ")
    print("  输出:")
    print("    A bird. (模型生成)\n")
    
    print("参数效率:")
    total_params = sum(p.numel() for p in multimodal_block.parameters())
    cross_attn_params = sum(p.numel() for p in multimodal_block.cross_attention_layer.parameters())
    
    print(f"  总参数: {total_params:,}")
    print(f"  交叉注意力参数: {cross_attn_params:,}")
    print(f"  占比: {cross_attn_params/total_params*100:.1f}%")
    print(f"  → 仅需训练{cross_attn_params/total_params*100:.1f}%的参数!\n")


demonstrate_multimodal_transformer()

🚀 第十一章:大模型时代 - 规模法则与涌现能力

11.2 涌现能力(Emergent Abilities)

def demonstrate_emergent_abilities():
    """演示大模型的涌现能力"""
    
    print("\n实验30:涌现能力 - 量变到质变\n")
    
    print("什么是涌现能力?")
    print("  当模型规模超过某个阈值时,突然出现的能力")
    print("  小模型:几乎随机猜测 (0% 准确率)")
    print("  大模型:显著超过随机 (>60% 准确率)\n")
    
    # 模拟不同任务的涌现曲线
    model_scales = [0.1, 0.5, 1, 7, 13, 70, 175, 540]  # 参数量(B)
    model_labels = ['100M', '500M', '1B', '7B', '13B', '70B', '175B', '540B']
    
    # 不同任务的表现
    tasks = {
        '3位数加法': [5, 8, 12, 15, 28, 85, 95, 98],
        '5-shot翻译': [10, 12, 15, 20, 35, 65, 80, 88],
        'CoT推理': [5, 5, 8, 12, 25, 70, 85, 92],
        '复杂推理': [5, 5, 5, 8, 15, 40, 75, 88]
    }
    
    fig, ax = plt.subplots(figsize=(12, 7))
    
    colors = ['blue', 'green', 'red', 'purple']
    
    for (task_name, scores), color in zip(tasks.items(), colors):
        ax.plot(model_scales, scores, marker='o', linewidth=2.5,
               label=task_name, color=color, markersize=8)
    
    # 添加随机基线
    ax.axhline(y=25, color='gray', linestyle='--', linewidth=2, label='随机猜测')
    
    ax.set_xlabel('模型参数量 (B)', fontsize=12)
    ax.set_ylabel('任务准确率 (%)', fontsize=12)
    ax.set_title('涌现能力:性能随模型规模的突变', fontsize=14, fontweight='bold')
    ax.set_xscale('log')
    ax.legend(fontsize=11, loc='lower right')
    ax.grid(True, alpha=0.3)
    ax.set_ylim(0, 100)
    
    plt.tight_layout()
    plt.show()
    
    print("观察到的涌现模式:")
    print("  1. 小模型阶段(<1B):接近随机水平")
    print("  2. 过渡阶段(1B-13B):缓慢提升")
    print("  3. 涌现阶段(>13B):性能突然大幅跃升")
    print("  4. 大模型阶段(>70B):持续改进但增速放缓\n")
    
    print("Chain-of-Thought (CoT) 示例:\n")
    
    print("问题:罗杰有5个网球。他又买了2罐网球,每罐3个球。他现在有多少个网球?\n")
    
    print("小模型(直接回答):")
    print("  → 10个球 ❌ (错误)\n")
    
    print("大模型(CoT推理):")
    print("  → 让我一步步思考:")
    print("     1. 罗杰最初有5个球")
    print("     2. 他买了2罐,每罐3个球")
    print("     3. 2罐 × 3个/罐 = 6个新球")
    print("     4. 总共:5 + 6 = 11个球")
    print("  → 答案:11个球 ✓ (正确)\n")
    
    print("涌现能力的理论解释:")
    print("  • 相变理论:模型容量达到临界点")
    print("  • 记忆假说:大模型记住了更多模式")
    print("  • 组合假说:简单能力组合产生复杂能力")
    print("  • 当前共识:可能是多种因素综合作用\n")


demonstrate_emergent_abilities()

11.3 大模型训练技术

class LargeModelTrainingTechniques:
    """大模型训练技术集合"""
    
    def __init__(self):
        pass
    
    def explain_mixed_precision(self):
        """混合精度训练"""
        
        print("\n实验31:混合精度训练\n")
        
        print("动机:")
        print("  • FP32(单精度):高精度,但内存占用大")
        print("  • FP16(半精度):节省内存,但数值范围小\n")
        
        print("混合精度策略:")
        print("  1. 前向传播:FP16")
        print("  2. 损失计算:FP16")
        print("  3. 反向传播:FP16")
        print("  4. 权重更新:FP32 (主副本)\n")
        
        print("关键技术:")
        print("  • Loss Scaling:放大损失防止梯度下溢")
        print("    scaled_loss = loss × scale_factor")
        print("    gradient = gradient / scale_factor")
        print("  • 动态调整:自动调节scale_factor\n")
        
        # 模拟内存节省
        model_sizes = [1, 7, 13, 70, 175]  # B参数
        
        fp32_memory = [s * 4 for s in model_sizes]  # 4 bytes per param
        fp16_memory = [s * 2 for s in model_sizes]  # 2 bytes per param
        
        fig, ax = plt.subplots(figsize=(10, 6))
        
        x = np.arange(len(model_sizes))
        width = 0.35
        
        ax.bar(x - width/2, fp32_memory, width, label='FP32', 
               color='red', alpha=0.7)
        ax.bar(x + width/2, fp16_memory, width, label='FP16', 
               color='green', alpha=0.7)
        
        ax.set_xlabel('模型大小 (B参数)', fontsize=12)
        ax.set_ylabel('内存占用 (GB)', fontsize=12)
        ax.set_title('混合精度带来的内存节省', fontsize=14, fontweight='bold')
        ax.set_xticks(x)
        ax.set_xticklabels([f'{s}B' for s in model_sizes])
        ax.legend(fontsize=11)
        ax.grid(True, alpha=0.3, axis='y')
        
        # 标注节省比例
        for i, (fp32, fp16) in enumerate(zip(fp32_memory, fp16_memory)):
            saving = (1 - fp16/fp32) * 100
            ax.text(i, max(fp32, fp16) + 10, f'↓{saving:.0f}%', 
                   ha='center', fontsize=10, fontweight='bold', color='blue')
        
        plt.tight_layout()
        plt.show()
        
        print("实际收益:")
        print("  • 内存:减少50%")
        print("  • 速度:提升2-3× (Tensor Core加速)")
        print("  • 精度:几乎无损 (配合FP32主副本)\n")
    
    def explain_gradient_checkpointing(self):
        """梯度检查点"""
        
        print("\n实验32:梯度检查点 (Activation Checkpointing)\n")
        
        print("问题:")
        print("  训练需要存储所有中间激活值用于反向传播")
        print("  大模型:激活值内存 >> 参数内存\n")
        
        print("解决方案:")
        print("  1. 前向传播:只保存部分检查点")
        print("  2. 反向传播:重新计算丢弃的激活值")
        print("  3. 权衡:时间换空间\n")
        
        # 可视化
        n_layers = 24
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
        
        # 标准训练
        layers = list(range(n_layers))
        activations_standard = [1] * n_layers
        
        ax1.bar(layers, activations_standard, color='red', alpha=0.7)
        ax1.set_xlabel('层编号', fontsize=11)
        ax1.set_ylabel('存储激活值', fontsize=11)
        ax1.set_title('标准训练:存储所有激活值', fontsize=12, fontweight='bold')
        ax1.set_ylim(0, 1.5)
        ax1.text(n_layers/2, 1.2, f'总内存:{n_layers}×', 
                ha='center', fontsize=12, bbox=dict(boxstyle='round', facecolor='yellow'))
        
        # 梯度检查点
        checkpoint_interval = 4
        activations_checkpoint = [1 if i % checkpoint_interval == 0 else 0.3 
                                  for i in range(n_layers)]
        
        colors = ['green' if i % checkpoint_interval == 0 else 'lightblue' 
                 for i in range(n_layers)]
        ax2.bar(layers, activations_checkpoint, color=colors, alpha=0.7)
        ax2.set_xlabel('层编号', fontsize=11)
        ax2.set_ylabel('存储激活值', fontsize=11)
        ax2.set_title('梯度检查点:仅存储检查点', fontsize=12, fontweight='bold')
        ax2.set_ylim(0, 1.5)
        
        n_checkpoints = n_layers // checkpoint_interval + 1
        ax2.text(n_layers/2, 1.2, f'总内存:{n_checkpoints}× (节省{(1-n_checkpoints/n_layers)*100:.0f}%)', 
                ha='center', fontsize=12, bbox=dict(boxstyle='round', facecolor='lightgreen'))
        
        plt.tight_layout()
        plt.show()
        
        print("配置策略:")
        print("  • 检查点间隔 = √n_layers (经验法则)")
        print("  • GPT-3 (96层): 每8-12层设置检查点")
        print("  • 内存节省:70-80%")
        print("  • 时间开销:20-30%\n")
    
    def explain_parallelism(self):
        """并行训练策略"""
        
        print("\n实验33:大模型并行策略\n")
        
        print("数据并行 (Data Parallelism):")
        print("  • 每个GPU持有完整模型副本")
        print("  • 不同GPU处理不同数据")
        print("  • 梯度同步后更新参数")
        print("  • 限制:单GPU必须容纳整个模型\n")
        
        print("模型并行 (Model Parallelism):")
        print("  1. 张量并行 (Tensor Parallelism)")
        print("     - 单层内部切分(如注意力头、FFN)")
        print("     - 需要频繁通信")
        print("  2. 流水线并行 (Pipeline Parallelism)")
        print("     - 按层切分到不同GPU")
        print("     - 微批次流水线执行\n")
        
        # 可视化三种并行
        fig, axes = plt.subplots(1, 3, figsize=(15, 4))
        
        # 数据并行
        ax = axes[0]
        for i in range(4):
            rect = plt.Rectangle((i*0.22, 0.3), 0.2, 0.4, 
                                facecolor='lightblue', edgecolor='black', linewidth=2)
            ax.add_patch(rect)
            ax.text(i*0.22 + 0.1, 0.5, f'GPU{i}\n完整\n模型', 
                   ha='center', va='center', fontsize=9, fontweight='bold')
            ax.text(i*0.22 + 0.1, 0.1, f'Batch{i}', 
                   ha='center', fontsize=8, style='italic')
        
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 1)
        ax.set_title('数据并行', fontsize=12, fontweight='bold')
        ax.axis('off')
        ax.text(0.5, 0.85, '梯度同步 ↔', ha='center', fontsize=10, 
               bbox=dict(boxstyle='round', facecolor='yellow'))
        
        # 张量并行
        ax = axes[1]
        colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#FFA07A']
        layer_names = ['Attn1', 'Attn2', 'FFN1', 'FFN2']
        
        for i in range(4):
            rect = plt.Rectangle((0.1, 0.7 - i*0.18), 0.8, 0.15,
                                facecolor=colors[i], edgecolor='black', linewidth=2, alpha=0.7)
            ax.add_patch(rect)
            ax.text(0.5, 0.7 - i*0.18 + 0.075, f'{layer_names[i]} (分片到4个GPU)', 
                   ha='center', va='center', fontsize=9, fontweight='bold')
        
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 1)
        ax.set_title('张量并行', fontsize=12, fontweight='bold')
        ax.axis('off')
        
        # 流水线并行
        ax = axes[2]
        for i in range(4):
            rect = plt.Rectangle((0.1, 0.7 - i*0.18), 0.8, 0.15,
                                facecolor='lightgreen', edgecolor='black', linewidth=2)
            ax.add_patch(rect)
            ax.text(0.5, 0.7 - i*0.18 + 0.075, f'GPU{i}: Layers {i*6}-{(i+1)*6-1}', 
                   ha='center', va='center', fontsize=9, fontweight='bold')
        
        # 流水线箭头
        for i in range(3):
            ax.annotate('', xy=(0.5, 0.52 - i*0.18), xytext=(0.5, 0.58 - i*0.18),
                       arrowprops=dict(arrowstyle='->', lw=2, color='red'))
        
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 1)
        ax.set_title('流水线并行', fontsize=12, fontweight='bold')
        ax.axis('off')
        
        plt.tight_layout()
        plt.show()
        
        print("实际部署(GPT-3规模):")
        print("  • 数据并行度:64 (不同节点)")
        print("  • 张量并行度:8 (节点内GPU)")
        print("  • 流水线并行度:16 (跨层)")
        print("  • 总GPU数:64 × 8 = 512个\n")
        
        print("ZeRO优化 (DeepSpeed):")
        print("  ZeRO-1: 优化器状态分片 (节省4×)")
        print("  ZeRO-2: + 梯度分片 (节省8×)")
        print("  ZeRO-3: + 参数分片 (节省N×,N=GPU数)")
        print("  → 1.5T参数模型可在512个GPU上训练\n")


trainer = LargeModelTrainingTechniques()
trainer.explain_mixed_precision()
trainer.explain_gradient_checkpointing()
trainer.explain_parallelism()

🎓 第十二章:总结与展望

12.1 Transformer核心要点回顾

def final_summary():
    """最终总结"""
    
    print("\n" + "="*70)
    print(" "*20 + "🎓 Transformer 核心要点总结")
    print("="*70 + "\n")
    
    summary = {
        "1️⃣ 自注意力机制": [
            "核心思想:动态计算序列元素间的关系",
            "公式:Attention(Q,K,V) = softmax(QK^T/√d_k)V",
            "优势:并行计算、长距离依赖、可解释性",
            "复杂度:O(n²·d) 是瓶颈"
        ],
        
        "2️⃣ 位置编码": [
            "问题:自注意力缺少位置信息",
            "方案:正弦位置编码 / 可学习位置编码",
            "作用:让模型区分不同位置的相同token"
        ],
        
        "3️⃣ 多头注意力": [
            "动机:学习不同类型的关系模式",
            "实现:并行计算多个注意力头",
            "效果:丰富表示能力"
        ],
        
        "4️⃣ 残差连接与层归一化": [
            "残差:缓解梯度消失,支持深层网络",
            "LayerNorm:稳定训练,适合变长序列",
            "组合:output = LN(x + Sublayer(x))"
        ],
        
        "5️⃣ 架构变体": [
            "Encoder-only (BERT):理解任务,双向注意力",
            "Decoder-only (GPT):生成任务,因果注意力",
            "Encoder-Decoder:翻译等seq2seq任务"
        ],
        
        "6️⃣ 扩展到多模态": [
            "ViT:图像视为patch序列",
            "CLIP:对比学习连接图像-文本",
            "Flamingo:交叉注意力融合多模态"
        ],
        
        "7️⃣ 效率优化": [
            "稀疏注意力:减少计算到O(n·log n)或O(n)",
            "线性注意力:核方法近似,O(n·d²)",
            "Flash Attention:IO优化,不改变算法"
        ],
        
        "8️⃣ 规模法则": [
            "性能 ∝ 模型大小^α",
            "涌现能力:超过阈值后突然出现新能力",
            "最优分配:模型和数据应等比例增长"
        ]
    }
    
    for key, points in summary.items():
        print(f"\n{key}")
        print("-" * 60)
        for point in points:
            print(f"  ✓ {point}")
    
    print("\n" + "="*70 + "\n")


def future_directions():
    """未来发展方向"""
    
    print("\n🔮 未来发展方向\n")
    
    directions = {
        "📈 更大规模": [
            "万亿参数模型 (10T+)",
            "多模态统一基座模型",
            "持续学习与终身学习"
        ],
        
        "⚡ 效率提升": [
            "稀疏激活 (MoE - Mixture of Experts)",
            "量化与剪枝",
            "神经架构搜索 (NAS)",
            "专用硬件加速"
        ],
        
        "🧠 能力增强": [
            "推理能力提升 (CoT, Tree-of-Thoughts)",
            "工具使用 (Plugin, Function Calling)",
            "多步规划与决策",
            "持续从反馈中学习 (RLHF)"
        ],
        
        "🔒 可靠性与安全": [
            "幻觉问题缓解",
            "对齐技术 (Alignment)",
            "可解释性增强",
            "鲁棒性与抗攻击"
        ],
        
        "🌍 应用拓展": [
            "科学研究 (蛋白质折叠、药物发现)",
            "代码生成与软件工程",
            "个性化教育助手",
            "具身智能 (Embodied AI)"
        ]
    }
    
    for direction, items in directions.items():
        print(f"{direction}")
        for item in items:
            print(f"  • {item}")
        print()
    
    print("="*70 + "\n")


def create_knowledge_graph():
    """创建知识图谱可视化"""
    
    print("\n📊 Transformer知识图谱\n")
    
    fig, ax = plt.subplots(figsize=(14, 10))
    
    # 核心节点
    core_topics = {
        'Transformer': (0.5, 0.5),
        'Self-Attention': (0.5, 0.7),
        'Multi-Head': (0.3, 0.7),
        'Position Encoding': (0.7, 0.7),
        'FFN': (0.3, 0.5),
        'Residual+LN': (0.7, 0.5),
        'BERT': (0.2, 0.3),
        'GPT': (0.5, 0.3),
        'ViT': (0.8, 0.3),
        'CLIP': (0.35, 0.15),
        'Efficient Attn': (0.65, 0.15),
        'Large Models': (0.5, 0.1)
    }
    
    # 节点分类
    categories = {
        'Core': ['Transformer', 'Self-Attention', 'Multi-Head', 'Position Encoding', 'FFN', 'Residual+LN'],
        'Architectures': ['BERT', 'GPT', 'ViT'],
        'Advanced': ['CLIP', 'Efficient Attn', 'Large Models']
    }
    
    colors_map = {
        'Core': '#FF6B6B',
        'Architectures': '#4ECDC4',
        'Advanced': '#95E1D3'
    }
    
    # 绘制节点
    for category, nodes in categories.items():
        for node in nodes:
            x, y = core_topics[node]
            circle = plt.Circle((x, y), 0.05, color=colors_map[category], 
                              ec='black', linewidth=2, zorder=3)
            ax.add_patch(circle)
            ax.text(x, y, node, ha='center', va='center', 
                   fontsize=9, fontweight='bold', zorder=4)
    
    # 连接关系
    connections = [
        ('Transformer', 'Self-Attention'),
        ('Transformer', 'FFN'),
        ('Transformer', 'Residual+LN'),
        ('Self-Attention', 'Multi-Head'),
        ('Self-Attention', 'Position Encoding'),
        ('Transformer', 'BERT'),
        ('Transformer', 'GPT'),
        ('Transformer', 'ViT'),
        ('BERT', 'CLIP'),
        ('ViT', 'CLIP'),
        ('Self-Attention', 'Efficient Attn'),
        ('GPT', 'Large Models'),
    ]
    
    for start, end in connections:
        x1, y1 = core_topics[start]
        x2, y2 = core_topics[end]
        ax.plot([x1, x2], [y1, y2], 'k-', alpha=0.3, linewidth=1.5, zorder=1)
    
    ax.set_xlim(0, 1)
    ax.set_ylim(0, 0.85)
    ax.set_aspect('equal')
    ax.axis('off')
    ax.set_title('Transformer 技术演进图谱', fontsize=16, fontweight='bold', pad=20)
    
    # 添加图例
    from matplotlib.patches import Patch
    legend_elements = [
        Patch(facecolor=colors_map['Core'], label='核心组件'),
        Patch(facecolor=colors_map['Architectures'], label='架构变体'),
        Patch(facecolor=colors_map['Advanced'], label='前沿应用')
    ]
    ax.legend(handles=legend_elements, loc='upper right', fontsize=11)
    
    plt.tight_layout()
    plt.show()


def closing_remarks():
    """结束语"""
    
    print("\n" + "="*70)
    print(" "*25 + "🎉 结束语")
    print("="*70 + "\n")
    
    print("""
    Transformer的诞生,标志着深度学习进入新纪元。
    
    从2017年的"Attention is All You Need",到今天的GPT-4、DALL-E、
    AlphaFold,Transformer已成为AI领域最重要的架构。
    
    它的成功源于几个关键设计:
      • 自注意力:优雅地捕获序列关系
      • 并行化:充分利用现代硬件
      • 可扩展:性能随规模提升
      • 通用性:适用多种模态与任务
    
    然而,我们仍面临挑战:
      • O(n²)复杂度限制长序列
      • 计算成本与能耗问题
      • 幻觉与可靠性
      • 可解释性与安全性
    
    这些挑战也是机遇。未来的研究将聚焦于:
      ✓ 更高效的注意力机制
      ✓ 更强的推理与规划能力
      ✓ 多模态深度融合
      ✓ 可控、可靠、可解释的AI系统
    
    Transformer的故事还在继续...
    
    感谢您的学习!希望这份教程帮助您深入理解了Transformer的
    核心原理与前沿进展。
    
    Keep Learning, Keep Building! 🚀
    """)
    
    print("="*70 + "\n")


# 执行最终总结
final_summary()
future_directions()
create_knowledge_graph()
closing_remarks()

12.2 推荐学习资源

def recommended_resources():
    """推荐学习资源"""
    
    print("\n📚 推荐学习资源\n")
    
    resources = {
        "📖 必读论文": [
            "1. Attention Is All You Need (Vaswani et al., 2017)",
            "2. BERT (Devlin et al., 2018)",
            "3. GPT-3 (Brown et al., 2020)",
            "4. Vision Transformer (Dosovitskiy et al., 2020)",
            "5. Scaling Laws (Kaplan et al., 2020)",
            "6. CLIP (Radford et al., 2021)",
            "7. Chinchilla (Hoffmann et al., 2022)"
        ],
        
        "💻 代码实现": [
            "• HuggingFace Transformers (transformers库)",
            "• Annotated Transformer (Harvard NLP)",
            "• nanoGPT (Andrej Karpathy)",
            "• PyTorch官方教程"
        ],
        
        "🎓 在线课程": [
            "• Stanford CS224N (NLP with Deep Learning)",
            "• Stanford CS25 (Transformers United)",
            "• Fast.ai - From Deep Learning Foundations to Stable Diffusion",
            "• DeepLearning.AI - NLP Specialization"
        ],
        
        "📺 视频讲解": [
            "• 3Blue1Brown - Attention in transformers",
            "• Yannic Kilcher - Paper解读",
            "• Two Minute Papers"
        ],
        
        "🔧 实践工具": [
            "• Google Colab (免费GPU)",
            "• Weights & Biases (实验跟踪)",
            "• TensorBoard (可视化)"
        ]
    }
    
    for category, items in resources.items():
        print(f"{category}")
        for item in items:
            print(f"  {item}")
        print()
    
    print("="*70 + "\n")


recommended_resources()

print("\n✨ 完整教程到此结束!祝您在AI领域不断进步!✨\n")

完整文章已创建完毕! 🎉

本教程涵盖了:

  • ✅ 基础概念(注意力机制、位置编码)
  • ✅ 核心组件(多头注意力、FFN、残差连接)
  • ✅ 架构变体(GPT、BERT)
  • ✅ 多模态应用(ViT、CLIP)
  • ✅ 效率优化(稀疏注意力、线性注意力)
  • ✅ 大模型技术(Scaling Laws、训练技巧)
  • ✅ 总结与展望

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐