经典CNN模型(十三):EfficientNetV2(PyTorch详细注释版)
EfficientNet V2 是 EfficientNet 系列的第二代模型,由谷歌的研究人员在 2021 年的 ICML 会议上提出。EfficientNet V2 继承了EfficientNet V1的核心理念,即复合缩放方法(Compound Scaling),但在此基础上进行了多项改进,以实现更小的模型体积、更快的训练速度和更好的参数效率。
一. EfficientNet V2 神经网络介绍
EfficientNet V2 是 EfficientNet 系列的第二代模型,由谷歌的研究人员在 2021 年的 ICML 会议上提出。EfficientNet V2 继承了EfficientNet V1的核心理念,即复合缩放方法(Compound Scaling),但在此基础上进行了多项改进,以实现更小的模型体积、更快的训练速度和更好的参数效率。下面是对 EfficientNet V2 的一些关键特性的概述:
核心特点
-
复合缩放(Compound Scaling):
- 这是一种调整模型深度、宽度和分辨率的统一方法,以达到最佳性能和效率的平衡。EfficientNet V2 通过精细地调整这些超参数,实现了更高效的模型。
-
网络架构:
- 使用了 MobileNetv2 和 Inverted Residuals 的变体,其中包括 Fused-MBConv 层,这是一种更高效的 MBConv(倒残差卷积)块,它将深度可分离卷积替换为标准的卷积操作,以减少计算成本。
-
神经架构搜索(NAS):
- 效率更高的架构是通过神经架构搜索(NAS)找到的,这个过程自动确定最佳的网络配置,如卷积核大小、扩张率(expansion ratio)等,以达到特定的性能和效率目标。
-
渐进式学习(Progressive Learning):
- 这是一种训练策略,从低分辨率图像开始训练,然后逐步增加图像分辨率。这种方法有助于模型更快地收敛,并且可以减少训练时间。
-
自适应正则化(Adaptive Regularization):
- 在渐进式学习中,随着图像分辨率的增加,正则化强度也会相应增加。这可以通过调整数据增强策略,如 RandAugment 的强度来实现,从而避免过拟合。
训练细节
-
EfficientNet V2 使用了混合精度训练,即在训练过程中同时使用 FP32 和 FP16 浮点数,以加快训练速度并节省内存。
-
它还利用了大规模的数据集,如 ImageNet-21k,以及数据增强技术,如 CutMix 和 MixUp,以增强模型的泛化能力。
模型变体
EfficientNet V2 有多个变体,包括 S、M、L 等,每个变体都有不同的复杂度和性能。例如:
- S (Small):相对较小的模型,适用于边缘设备。
- M (Medium):中等大小的模型,提供良好的性能与效率平衡。
- L (Large):较大的模型,针对更高精度的任务。
如下图,展示了不同模型在训练时间和准确率上的对比。其中EfficientNetV2系列的表现非常出色,特别是在训练时间较长的情况下,如EffNetV2-XL(21k)和EffNetV2-L,它们分别达到了87%和86%的准确率。相比之下,其他模型如NFNet-F4和ViT-L/16(21k)虽然也有不错的表现,但准确率略低于EfficientNetV2系列。总的来说,EfficientNetV2在保持高效的同时还能获得很高的准确率。
在 EfficientNetV1 中作者关注的是准确率,参数数量以及 FLOPs(理论计算量小不代表推理速度快),在 EfficientNetV2 中作者进一步关注模型的训练速度。(其实我们更关心准确率和推理速度)。在下表中可见,V2 相比 V1 在训练时间和推理时间上都有较大的优势。


EfficientNetV2 值得关注的点在于两个方面:
- 采用新的网络模块:Fused-MBConv
- 采用渐进式学习策略,使得训练更快
二. EfficientNet V2 神经网络细节
EfficientNetV2 是 EfficientNet 系列的升级版,通过引入Fused MBConv块、优化的神经架构搜索及渐进式学习策略,实现了在计算效率和模型精度间的更佳平衡。
1. EfficientNetV1 中存在的问题
EfficientNetV1 存在的几个问题:
- 训练图像的尺寸很大时,训练速度非常慢
- 在网络浅层中使用 Depthwise convolutions 速度会很慢
- 同等的放大每个 stage 是次优的
1.1 训练图像的尺寸很大时,训练速度非常慢
见表 2,在 EfficientNet-B6 的实验中,当输入图像尺寸较大时,训练速度显著下降。例如,当输入尺寸为 380 时,batch size 设置为 12 时,V100 GPU 每秒只能处理 37 张图像,而当 batch size 增大到 24 时,每秒仅能处理 52 张图像。然而,当输入尺寸扩大至 512 时,GPU 内存不足,导致出现 “out of memory” 错误。由于批量归一化(BN)模块通常要求尽可能大的 batch size,但由于图像尺寸过大,无法做到这一点。因此,为了应对大尺寸图像训练速度慢的问题,自然而然地想到了减小训练图像尺寸,这样不仅能加快训练速度,还允许使用更大的 batch size。

1.2 在网络浅层中使用 Depthwise convolutions 速度会很慢
在 EfficientNetV2 中,作者观察到 MBConv 模块在深层网络中的 Depthwise Convolution 速度较慢,原因在于当前硬件对 DW Conv 的加速支持有限。为解决这个问题,作者提出了 Fused-MBConv。简单来说,就是将 MBConv 模块中的 1x1 和 Depthwise Conv 融合成一个 3x3 卷积操作。实验结果显示(见表 3),只在早期阶段使用 Fused MBConv 的效果更好,而非全网替换。最后,作者通过 NAS 技术找到了最优解,即只替换前三个 stage 的 MBConv。


1.3 同等的放大每个 stage 是次优的
在 EfficientNetV1 中,每个 stage 的深度和宽度被同等放大,只需简单地乘以相应的缩放因子即可。然而,不同 stage 对模型的训练速度和参数量的影响并不一致,因此作者采用了非均匀的缩放策略。具体策略并未详细说明,只是给出了各模型对应的参数值。

2. EfficientNetV2 的贡献
在 EfficientNetV2 中,研究者做出了以下贡献:
- 提出了全新的 EfficientNetV2 网络,相较于之前的网络,其在训练速度和参数量上有明显优势。
- 设计了改进的渐进式学习方法,能够根据训练图像的尺寸动态调节正则化方法,如 Dropout、Rand Augment和Mixup,从而提高了训练速度和准确性。
- 实验结果表明,EfficientNetV2 的训练速度比之前的一些网络快了约 11 倍,参数量仅为原来的 1/6.8。

3. Fuse-MBConv 模块
让我们深入了解一下 Fused-MBConv 模块。虽然论文中有 SE 模块,但在实际代码实现中并不存在。当 expansion 等于1时,主分支包含一个 3x3 卷积,紧随其后的是 BN 和 SILU 激活函数,还有一个 Dropout。如果 expansion 大于 1,则主分支首先应用一个升维的 3x3 卷积,接着是 BN 和 SILU 激活函数,然后是一个 1x1 卷积,同样伴随 BN 和 Dropout。只有在 stride 为 1 并且输入特征矩阵的通道数与主分支输出特征矩阵的通道数相同时,才会存在 shortcut 连接。此外,只有在存在 shortcut 连接的情况下,才会应用 Dropout。

在 EfficientNetV2 中,由于批量归一化(BN)和 dropout 一起使用可能会有问题,所以这里采用了一种名为 “Deep Networks with Stochastic Depth” 的方法。它实际上是以一定概率完全丢弃主分支的输出,即直接使用上一层的输出。这使得网络具有随机性,因为没有这个 block 就相当于网络的深度减小了一层。在 EfficientNetV2 中,失活概率从 0 增加到 0.2。这样做可以提高训练速度并稍微提高精度。需要注意的是,这里的 dropout 仅指 Fused-MBConv 和 MBConv 中的 dropout,不包括最后一层全连接前的 dropout。

4. 渐进式学习策略
作者进行了一个实验,对于不同的训练输入尺寸,使用不同强度的 RandAug 可以达到最佳效果。于是作者思考是否应该在使用不同训练输入尺寸时选择不同的正则化方法。

训练早期使用较小的训练尺寸以及较弱的正则方法 weak regularization, 这
样网络能够快速的学习到一些简单的表达能力。 接着逐渐提升图像尺寸,
同时增强正则方法 adding stronger regularization。 这里所说的 regularization
包括 Dropout, RandAugment 以及 Mixup。

下表中给出了正则化强度如何随着图像尺寸变化的算法,其本质是个线性插值(线性变换)。

下表中给出了针对每一个模型使用的不同的图像训练尺寸以及正则化强度的变化范围。min 指 epoch 1,而 max 指 epoch 5。

最后作者为了证明渐进式学习策略的有效性,作者在 ResNet 以及 EfficientNetV1 上也进行了实验,括号中是限定的最大尺寸。可见达到相同正确率的时候,训练时间大幅缩小,所以其是具有普适性的。

三. EfficientNet V2 神经网络结构
以下是 EfficientNetV2-S 架构的详细描述,包括 MBConv 和 Fused-MBConv 块:
| Stage | Operator | Stride | #Channels | #Layers |
|---|---|---|---|---|
| 0 | Conv3x3 | 2 | 24 | 1 |
| 1 | Fused-MBConv1, k3x3 | 1 | 24 | 2 |
| 2 | Fused-MBConv4, k3x3 | 2 | 48 | 4 |
| 3 | Fused-MBConv4, k3x3 | 2 | 64 | 4 |
| 4 | MBConv4, k3x3, SE0.25 | 2 | 128 | 6 |
| 5 | MBConv6, k3x3, SE0.25 | 1 | 160 | 9 |
| 6 | MBConv6, k3x3, SE0.25 | 2 | 256 | 15 |
| 7 | Conv1x1 & Pooling & FC | - | 1280 | 1 |
这张图片显示的是 EfficientNetV2-S 架构的一个表格,其中列出了各个阶段的运算符、步长、通道数和层数。具体来说:
- 阶段 0:使用一个 3x3 的卷积操作,步长为 2,生成 24 个通道,只有一层;
- 阶段 1:融合的移动卷积块 1,核大小为 3x3,步长为 1,生成 24 个通道,有两层;
- 阶段 2:融合的移动卷积块 4,核大小为 3x3,步长为 2,生成 48 个通道,有四层;
- 阶段 3:融合的移动卷积块 4,核大小为 3x3,步长为 2,生成 64 个通道,有四层;
- 阶段 4:移动卷积块 4,核大小为 3x3,步长为 2,生成 128 个通道,有 6 层,还带有 0.25 的 SE 模块;
- 阶段 5:移动卷积块 6,核大小为 3x3,步长为 1,生成 160 个通道,有 9 层,还带有 0.25 的 SE 模块;
- 阶段 6:移动卷积块 6,核大小为 3x3,步长为 2,生成 256 个通道,有 15 层,还带有 0.25 的 SE 模块;
- 阶段 7:1x1 的卷积、池化及全连接操作,没有指定步长,生成 1280 个通道,只有一层。
源码中的配置
在 EfficientNet V2 的配置文件 effnetv2_configs.py 中,定义了一系列用于构建模型的参数。例如,‘r2_k3_s1_e1_i24_o24_c1’ 表示一个基础块,其中:
- ‘r’ 代表当前 Stage 中 Operator 重复堆叠的次数,这里是 2 次;
- ‘k’ 代表内核大小(kernel_size),这里是 3;
- ‘s’ 代表步幅(stride),这里是 1;
- ‘e’ 代表扩张比例(expansion ratio),这里是 1;
- ‘i’ 代表输入通道(input channels),这里是 24;
- ‘o’ 代表输出通道(output channels),这里是 24;
- ‘c’ 代表卷积类型(conv_type),1 代表 Fused-MBConv,0 代表 MBConv(默认为MBConv);
- ‘se’ 代表是否使用 SE 模块,以及 se_ratio。
因此,该配置表示一个 Fused-MBConv 块,其中内核大小为 3x3,步幅为 1,扩张比率为 1,输入和输出通道均为 24,并且包含了 SE 模块。

1. EfficientNetV2-S 的配置如下:
- ‘r2_k3_s1_e1_i24_o24_c1’: 表示 Operator 重复堆叠 2 次,内核大小为 3,步幅为 1,扩张比率为 1,输入通道为 24,输出通道为 24,使用 Fused-MBConv。
- ‘r4_k3_s2_e4_i24_o48_c1’: 表示 Operator 重复堆叠 4 次,内核大小为 3,步幅为 2,扩张比率为 4,输入通道为 24,输出通道为 48,使用 Fused-MBConv。
- ‘r4_k3_s2_e4_i48_o64_c1’: 表示 Operator 重复堆叠 4 次,内核大小为 3,步幅为 2,扩张比率为 4,输入通道为 48,输出通道为 64,使用 Fused-MBConv。
- ‘r6_k3_s2_e4_i64_o128_se0.25’: 表示 Operator 重复堆叠 6 次,内核大小为 3,步幅为 2,扩张比率为 4,输入通道为 64,输出通道为 128,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。
- ‘r9_k3_s1_e6_i128_o160_se0.25’: 表示 Operator 重复堆叠 9 次,内核大小为 3,步幅为 1,扩张比率为 6,输入通道为 128,输出通道为 160,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。
- ‘r15_k3_s2_e6_i160_o256_se0.25’: 表示 Operator 重复堆叠 15 次,内核大小为 3,步幅为 2,扩张比率为 6,输入通道为 160,输出通道为 256,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。

2. EfficientNetV2-M 的配置如下:
- ‘r3_k3_s1_e1_i24_o24_c1’: 表示 Operator 重复堆叠 3 次,内核大小为 3,步幅为 1,扩张比率为 1,输入通道为 24,输出通道为 24,使用 Fused-MBConv。
- ‘r5_k3_s2_e4_i24_o48_c1’: 表示 Operator 重复堆叠 5 次,内核大小为 3,步幅为 2,扩张比率为 4,输入通道为 24,输出通道为 48,使用 Fused-MBConv。
- ‘r5_k3_s2_e4_i48_o80_c1’: 表示 Operator 重复堆叠 5 次,内核大小为 3,步幅为 2,扩张比率为 4,输入通道为 48,输出通道为 80,使用 Fused-MBConv。
- ‘r7_k3_s2_e4_i80_o160_se0.25’: 表示 Operator 重复堆叠 7 次,内核大小为 3,步幅为 2,扩张比率为 4,输入通道为 80,输出通道为 160,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。
- ‘r14_k3_s1_e6_i160_o176_se0.25’: 表示 Operator 重复堆叠 14 次,内核大小为 3,步幅为 1,扩张比率为 6,输入通道为 160,输出通道为 176,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。
- ‘r18_k3_s2_e6_i176_o304_se0.25’: 表示 Operator 重复堆叠 18 次,内核大小为 3,步幅为 2,扩张比率为 6,输入通道为 176,输出通道为 304,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。
- ‘r5_k3_s1_e6_i304_o512_se0.25’: 表示 Operator 重复堆叠 5 次,内核大小为 3,步幅为 1,扩张比率为 6,输入通道为 304,输出通道为 512,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。

3. EfficientNetV2-L 的配置如下:
- ‘r4_k3_s1_e1_i32_o32_c1’: 表示 Operator 重复堆叠 4 次,内核大小为 3,步幅为 1,扩张比率为 1,输入通道为 32,输出通道为 32,使用 Fused-MBConv。
- ‘r7_k3_s2_e4_i32_o64_c1’: 表示 Operator 重复堆叠 7 次,内核大小为 3,步幅为 2,扩张比率为 4,输入通道为 32,输出通道为 64,使用 Fused-MBConv。
- ‘r7_k3_s2_e4_i64_o96_c1’: 表示 Operator 重复堆叠 7 次,内核大小为 3,步幅为 2,扩张比率为 4,输入通道为 64,输出通道为 96,使用 Fused-MBConv。
- ‘r10_k3_s2_e4_i96_o192_se0.25’: 表示 Operator 重复堆叠 10 次,内核大小为 3,步幅为 2,扩张比率为 4,输入通道为 96,输出通道为 192,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。
- ‘r19_k3_s1_e6_i192_o224_se0.25’: 表示 Operator 重复堆叠 19 次,内核大小为 3,步幅为 1,扩张比率为 6,输入通道为 192,输出通道为 224,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。
- ‘r25_k3_s2_e6_i224_o384_se0.25’: 表示 Operator 重复堆叠 25 次,内核大小为 3,步幅为 2,扩张比率为 6,输入通道为 224,输出通道为 384,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。
- ‘r7_k3_s1_e6_i384_o640_se0.25’: 表示 Operator 重复堆叠 7 次,内核大小为 3,步幅为 1,扩张比率为 6,输入通道为 384,输出通道为 640,使用 Fused-MBConv,并且包含 SE 模块,se_ratio 为 0.25。

源码中给的一些训练参数如下所示,例如对于 EfficientNetV2-S,训练输入尺寸为 300(最大训练尺寸为 300,实际训练的时候每张图是会变的),验证尺寸为固定的 384。这里 Dropout 对应的是最后全连接层之前的那个。后面的 randaug,mixup 以及 aug 则是针对渐进式学习使用到的超参数。

四. EfficientNet V2 代码实现
开发环境配置说明:本项目使用 Python 3.6.13 和 PyTorch 1.10.2 构建,适用于CPU环境。
- model.py:定义网络模型
- train.py:加载数据集并训练,计算 loss 和 accuracy,保存训练好的网络参数
- predict.py:用自己的数据集进行分类测试
- utils.py:依赖脚本
- my_dataset.py:依赖脚本
- model.py
from collections import OrderedDict
from functools import partial
from typing import Callable, Optional
import torch.nn as nn
import torch
from torch import Tensor
def drop_path(x, drop_prob: float = 0., training: bool = False):
"""
Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
"Deep Networks with Stochastic Depth", https://arxiv.org/pdf/1603.09382.pdf
This function is taken from the rwightman.
It can be seen here:
https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/layers/drop.py#L140
"""
if drop_prob == 0. or not training:
return x
keep_prob = 1 - drop_prob
shape = (x.shape[0],) + (1,) * (x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets
random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
random_tensor.floor_() # binarize
output = x.div(keep_prob) * random_tensor
return output
class DropPath(nn.Module):
"""
Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
"Deep Networks with Stochastic Depth", https://arxiv.org/pdf/1603.09382.pdf
"""
def __init__(self, drop_prob=None):
super(DropPath, self).__init__()
self.drop_prob = drop_prob
def forward(self, x):
return drop_path(x, self.drop_prob, self.training)
class ConvBNAct(nn.Module):
def __init__(self,
in_planes: int,
out_planes: int,
kernel_size: int = 3,
stride: int = 1,
groups: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
activation_layer: Optional[Callable[..., nn.Module]] = None):
super(ConvBNAct, self).__init__()
padding = (kernel_size - 1) // 2
if norm_layer is None:
norm_layer = nn.BatchNorm2d
if activation_layer is None:
activation_layer = nn.SiLU # alias Swish (torch>=1.7)
self.conv = nn.Conv2d(in_channels=in_planes,
out_channels=out_planes,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=groups,
bias=False)
self.bn = norm_layer(out_planes)
self.act = activation_layer()
def forward(self, x):
result = self.conv(x)
result = self.bn(result)
result = self.act(result)
return result
class SqueezeExcite(nn.Module):
def __init__(self,
input_c: int, # block input channel
expand_c: int, # block expand channel
se_ratio: float = 0.25):
super(SqueezeExcite, self).__init__()
squeeze_c = int(input_c * se_ratio)
self.conv_reduce = nn.Conv2d(expand_c, squeeze_c, 1)
self.act1 = nn.SiLU() # alias Swish
self.conv_expand = nn.Conv2d(squeeze_c, expand_c, 1)
self.act2 = nn.Sigmoid()
def forward(self, x: Tensor) -> Tensor:
scale = x.mean((2, 3), keepdim=True)
scale = self.conv_reduce(scale)
scale = self.act1(scale)
scale = self.conv_expand(scale)
scale = self.act2(scale)
return scale * x
class MBConv(nn.Module):
def __init__(self,
kernel_size: int,
input_c: int,
out_c: int,
expand_ratio: int,
stride: int,
se_ratio: float,
drop_rate: float,
norm_layer: Callable[..., nn.Module]):
super(MBConv, self).__init__()
if stride not in [1, 2]:
raise ValueError("illegal stride value.")
self.has_shortcut = (stride == 1 and input_c == out_c)
activation_layer = nn.SiLU # alias Swish
expanded_c = input_c * expand_ratio
# 在EfficientNetV2中,MBConv中不存在expansion=1的情况所以conv_pw肯定存在
assert expand_ratio != 1
# Point-wise expansion
self.expand_conv = ConvBNAct(input_c,
expanded_c,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=activation_layer)
# Depth-wise convolution
self.dwconv = ConvBNAct(expanded_c,
expanded_c,
kernel_size=kernel_size,
stride=stride,
groups=expanded_c,
norm_layer=norm_layer,
activation_layer=activation_layer)
self.se = SqueezeExcite(input_c, expanded_c, se_ratio) if se_ratio > 0 else nn.Identity()
# Point-wise linear projection
self.project_conv = ConvBNAct(expanded_c,
out_planes=out_c,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=nn.Identity) # 注意这里没有激活函数,所有传入Identity
self.out_channels = out_c
# 只有在使用shortcut连接时才使用dropout层
self.drop_rate = drop_rate
if self.has_shortcut and drop_rate > 0:
self.dropout = DropPath(drop_rate)
def forward(self, x: Tensor) -> Tensor:
result = self.expand_conv(x)
result = self.dwconv(result)
result = self.se(result)
result = self.project_conv(result)
if self.has_shortcut:
if self.drop_rate > 0:
result = self.dropout(result)
result += x
return result
class FusedMBConv(nn.Module):
def __init__(self,
kernel_size: int,
input_c: int,
out_c: int,
expand_ratio: int,
stride: int,
se_ratio: float,
drop_rate: float,
norm_layer: Callable[..., nn.Module]):
super(FusedMBConv, self).__init__()
assert stride in [1, 2]
assert se_ratio == 0
self.has_shortcut = stride == 1 and input_c == out_c
self.drop_rate = drop_rate
self.has_expansion = expand_ratio != 1
activation_layer = nn.SiLU # alias Swish
expanded_c = input_c * expand_ratio
# 只有当expand ratio不等于1时才有expand conv
if self.has_expansion:
# Expansion convolution
self.expand_conv = ConvBNAct(input_c,
expanded_c,
kernel_size=kernel_size,
stride=stride,
norm_layer=norm_layer,
activation_layer=activation_layer)
self.project_conv = ConvBNAct(expanded_c,
out_c,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=nn.Identity) # 注意没有激活函数
else:
# 当只有project_conv时的情况
self.project_conv = ConvBNAct(input_c,
out_c,
kernel_size=kernel_size,
stride=stride,
norm_layer=norm_layer,
activation_layer=activation_layer) # 注意有激活函数
self.out_channels = out_c
# 只有在使用shortcut连接时才使用dropout层
self.drop_rate = drop_rate
if self.has_shortcut and drop_rate > 0:
self.dropout = DropPath(drop_rate)
def forward(self, x: Tensor) -> Tensor:
if self.has_expansion:
result = self.expand_conv(x)
result = self.project_conv(result)
else:
result = self.project_conv(x)
if self.has_shortcut:
if self.drop_rate > 0:
result = self.dropout(result)
result += x
return result
class EfficientNetV2(nn.Module):
def __init__(self,
model_cnf: list,
num_classes: int = 1000,
num_features: int = 1280,
dropout_rate: float = 0.2,
drop_connect_rate: float = 0.2):
super(EfficientNetV2, self).__init__()
for cnf in model_cnf:
assert len(cnf) == 8
norm_layer = partial(nn.BatchNorm2d, eps=1e-3, momentum=0.1)
stem_filter_num = model_cnf[0][4]
self.stem = ConvBNAct(3,
stem_filter_num,
kernel_size=3,
stride=2,
norm_layer=norm_layer) # 激活函数默认是SiLU
total_blocks = sum([i[0] for i in model_cnf])
block_id = 0
blocks = []
for cnf in model_cnf:
repeats = cnf[0]
op = FusedMBConv if cnf[-2] == 0 else MBConv
for i in range(repeats):
blocks.append(op(kernel_size=cnf[1],
input_c=cnf[4] if i == 0 else cnf[5],
out_c=cnf[5],
expand_ratio=cnf[3],
stride=cnf[2] if i == 0 else 1,
se_ratio=cnf[-1],
drop_rate=drop_connect_rate * block_id / total_blocks,
norm_layer=norm_layer))
block_id += 1
self.blocks = nn.Sequential(*blocks)
head_input_c = model_cnf[-1][-3]
head = OrderedDict()
head.update({"project_conv": ConvBNAct(head_input_c,
num_features,
kernel_size=1,
norm_layer=norm_layer)}) # 激活函数默认是SiLU
head.update({"avgpool": nn.AdaptiveAvgPool2d(1)})
head.update({"flatten": nn.Flatten()})
if dropout_rate > 0:
head.update({"dropout": nn.Dropout(p=dropout_rate, inplace=True)})
head.update({"classifier": nn.Linear(num_features, num_classes)})
self.head = nn.Sequential(head)
# initial weights
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out")
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.zeros_(m.bias)
def forward(self, x: Tensor) -> Tensor:
x = self.stem(x)
x = self.blocks(x)
x = self.head(x)
return x
def efficientnetv2_s(num_classes: int = 1000):
"""
EfficientNetV2
https://arxiv.org/abs/2104.00298
"""
# train_size: 300, eval_size: 384
# repeat, kernel, stride, expansion, in_c, out_c, operator, se_ratio
model_config = [[2, 3, 1, 1, 24, 24, 0, 0],
[4, 3, 2, 4, 24, 48, 0, 0],
[4, 3, 2, 4, 48, 64, 0, 0],
[6, 3, 2, 4, 64, 128, 1, 0.25],
[9, 3, 1, 6, 128, 160, 1, 0.25],
[15, 3, 2, 6, 160, 256, 1, 0.25]]
model = EfficientNetV2(model_cnf=model_config,
num_classes=num_classes,
dropout_rate=0.2)
return model
def efficientnetv2_m(num_classes: int = 1000):
"""
EfficientNetV2
https://arxiv.org/abs/2104.00298
"""
# train_size: 384, eval_size: 480
# repeat, kernel, stride, expansion, in_c, out_c, operator, se_ratio
model_config = [[3, 3, 1, 1, 24, 24, 0, 0],
[5, 3, 2, 4, 24, 48, 0, 0],
[5, 3, 2, 4, 48, 80, 0, 0],
[7, 3, 2, 4, 80, 160, 1, 0.25],
[14, 3, 1, 6, 160, 176, 1, 0.25],
[18, 3, 2, 6, 176, 304, 1, 0.25],
[5, 3, 1, 6, 304, 512, 1, 0.25]]
model = EfficientNetV2(model_cnf=model_config,
num_classes=num_classes,
dropout_rate=0.3)
return model
def efficientnetv2_l(num_classes: int = 1000):
"""
EfficientNetV2
https://arxiv.org/abs/2104.00298
"""
# train_size: 384, eval_size: 480
# repeat, kernel, stride, expansion, in_c, out_c, operator, se_ratio
model_config = [[4, 3, 1, 1, 32, 32, 0, 0],
[7, 3, 2, 4, 32, 64, 0, 0],
[7, 3, 2, 4, 64, 96, 0, 0],
[10, 3, 2, 4, 96, 192, 1, 0.25],
[19, 3, 1, 6, 192, 224, 1, 0.25],
[25, 3, 2, 6, 224, 384, 1, 0.25],
[7, 3, 1, 6, 384, 640, 1, 0.25]]
model = EfficientNetV2(model_cnf=model_config,
num_classes=num_classes,
dropout_rate=0.4)
return model
- train.py
import os
import math
import argparse
import torch
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
import torch.optim.lr_scheduler as lr_scheduler
from model import efficientnetv2_s as create_model
from my_dataset import MyDataSet
from utils import read_split_data, train_one_epoch, evaluate
def main(args):
device = torch.device(args.device if torch.cuda.is_available() else "cpu")
print(args)
print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
tb_writer = SummaryWriter()
if os.path.exists("./weights") is False:
os.makedirs("./weights")
train_images_path, train_images_label, val_images_path, val_images_label = read_split_data(args.data_path)
img_size = {"s": [300, 384], # train_size, val_size
"m": [384, 480],
"l": [384, 480]}
num_model = "s"
data_transform = {
"train": transforms.Compose([transforms.RandomResizedCrop(img_size[num_model][0]),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]),
"val": transforms.Compose([transforms.Resize(img_size[num_model][1]),
transforms.CenterCrop(img_size[num_model][1]),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])}
# 实例化训练数据集
train_dataset = MyDataSet(images_path=train_images_path,
images_class=train_images_label,
transform=data_transform["train"])
# 实例化验证数据集
val_dataset = MyDataSet(images_path=val_images_path,
images_class=val_images_label,
transform=data_transform["val"])
batch_size = args.batch_size
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers
print('Using {} dataloader workers every process'.format(nw))
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=batch_size,
shuffle=True,
pin_memory=True,
num_workers=nw,
collate_fn=train_dataset.collate_fn)
val_loader = torch.utils.data.DataLoader(val_dataset,
batch_size=batch_size,
shuffle=False,
pin_memory=True,
num_workers=nw,
collate_fn=val_dataset.collate_fn)
# 如果存在预训练权重则载入
model = create_model(num_classes=args.num_classes).to(device)
if args.weights != "":
if os.path.exists(args.weights):
weights_dict = torch.load(args.weights, map_location=device)
load_weights_dict = {k: v for k, v in weights_dict.items()
if model.state_dict()[k].numel() == v.numel()}
print(model.load_state_dict(load_weights_dict, strict=False))
else:
raise FileNotFoundError("not found weights file: {}".format(args.weights))
# 是否冻结权重
if args.freeze_layers:
for name, para in model.named_parameters():
# 除head外,其他权重全部冻结
if "head" not in name:
para.requires_grad_(False)
else:
print("training {}".format(name))
pg = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=1E-4)
# Scheduler https://arxiv.org/pdf/1812.01187.pdf
lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf # cosine
scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)
for epoch in range(args.epochs):
# train
train_loss, train_acc = train_one_epoch(model=model,
optimizer=optimizer,
data_loader=train_loader,
device=device,
epoch=epoch)
scheduler.step()
# validate
val_loss, val_acc = evaluate(model=model,
data_loader=val_loader,
device=device,
epoch=epoch)
tags = ["train_loss", "train_acc", "val_loss", "val_acc", "learning_rate"]
tb_writer.add_scalar(tags[0], train_loss, epoch)
tb_writer.add_scalar(tags[1], train_acc, epoch)
tb_writer.add_scalar(tags[2], val_loss, epoch)
tb_writer.add_scalar(tags[3], val_acc, epoch)
tb_writer.add_scalar(tags[4], optimizer.param_groups[0]["lr"], epoch)
torch.save(model.state_dict(), "./weights/model-{}.pth".format(epoch))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num_classes', type=int, default=5)
parser.add_argument('--epochs', type=int, default=1)
parser.add_argument('--batch-size', type=int, default=8)
parser.add_argument('--lr', type=float, default=0.01)
parser.add_argument('--lrf', type=float, default=0.01)
# 数据集所在根目录
# https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz
parser.add_argument('--data-path', type=str,
default="E:/code/PyCharm_Projects/deep_learning/data_set/flower_data/flower_photos")
# download model weights
# 链接: https://pan.baidu.com/s/1uZX36rvrfEss-JGj4yfzbQ 密码: 5gu1
parser.add_argument('--weights', type=str, default='./pre_efficientnetv2-s.pth',
help='initial weights path')
parser.add_argument('--freeze-layers', type=bool, default=True)
parser.add_argument('--device', default='cuda:0', help='device id (i.e. 0 or 0,1 or cpu)')
opt = parser.parse_args()
main(opt)
- predict.py
import os
import json
import torch
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt
from model import efficientnetv2_s as create_model
def main():
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
img_size = {"s": [300, 384], # train_size, val_size
"m": [384, 480],
"l": [384, 480]}
num_model = "s"
data_transform = transforms.Compose(
[transforms.Resize(img_size[num_model][1]),
transforms.CenterCrop(img_size[num_model][1]),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
# load image
img_path = "郁金香.png"
assert os.path.exists(img_path), "file: '{}' dose not exist.".format(img_path)
img = Image.open(img_path)
plt.imshow(img)
# [N, C, H, W]
img = data_transform(img)
# expand batch dimension
img = torch.unsqueeze(img, dim=0)
# read class_indict
json_path = './class_indices.json'
assert os.path.exists(json_path), "file: '{}' dose not exist.".format(json_path)
with open(json_path, "r") as f:
class_indict = json.load(f)
# create model
model = create_model(num_classes=5).to(device)
# load model weights
model_weight_path = "./weights/model-29.pth"
model.load_state_dict(torch.load(model_weight_path, map_location=device))
model.eval()
with torch.no_grad():
# predict class
output = torch.squeeze(model(img.to(device))).cpu()
predict = torch.softmax(output, dim=0)
predict_cla = torch.argmax(predict).numpy()
print_res = "class: {} prob: {:.3}".format(class_indict[str(predict_cla)],
predict[predict_cla].numpy())
plt.title(print_res)
for i in range(len(predict)):
print("class: {:10} prob: {:.3}".format(class_indict[str(i)],
predict[i].numpy()))
plt.show()
if __name__ == '__main__':
main()
- utils.py
import os
import sys
import json
import pickle
import random
import torch
from tqdm import tqdm
import matplotlib.pyplot as plt
def read_split_data(root: str, val_rate: float = 0.2):
random.seed(0) # 保证随机结果可复现
assert os.path.exists(root), "dataset root: {} does not exist.".format(root)
# 遍历文件夹,一个文件夹对应一个类别
flower_class = [cla for cla in os.listdir(root) if os.path.isdir(os.path.join(root, cla))]
# 排序,保证各平台顺序一致
flower_class.sort()
# 生成类别名称以及对应的数字索引
class_indices = dict((k, v) for v, k in enumerate(flower_class))
json_str = json.dumps(dict((val, key) for key, val in class_indices.items()), indent=4)
with open('class_indices.json', 'w') as json_file:
json_file.write(json_str)
train_images_path = [] # 存储训练集的所有图片路径
train_images_label = [] # 存储训练集图片对应索引信息
val_images_path = [] # 存储验证集的所有图片路径
val_images_label = [] # 存储验证集图片对应索引信息
every_class_num = [] # 存储每个类别的样本总数
supported = [".jpg", ".JPG", ".png", ".PNG"] # 支持的文件后缀类型
# 遍历每个文件夹下的文件
for cla in flower_class:
cla_path = os.path.join(root, cla)
# 遍历获取supported支持的所有文件路径
images = [os.path.join(root, cla, i) for i in os.listdir(cla_path)
if os.path.splitext(i)[-1] in supported]
# 排序,保证各平台顺序一致
images.sort()
# 获取该类别对应的索引
image_class = class_indices[cla]
# 记录该类别的样本数量
every_class_num.append(len(images))
# 按比例随机采样验证样本
val_path = random.sample(images, k=int(len(images) * val_rate))
for img_path in images:
if img_path in val_path: # 如果该路径在采样的验证集样本中则存入验证集
val_images_path.append(img_path)
val_images_label.append(image_class)
else: # 否则存入训练集
train_images_path.append(img_path)
train_images_label.append(image_class)
print("{} images were found in the dataset.".format(sum(every_class_num)))
print("{} images for training.".format(len(train_images_path)))
print("{} images for validation.".format(len(val_images_path)))
assert len(train_images_path) > 0, "number of training images must greater than 0."
assert len(val_images_path) > 0, "number of validation images must greater than 0."
plot_image = False
if plot_image:
# 绘制每种类别个数柱状图
plt.bar(range(len(flower_class)), every_class_num, align='center')
# 将横坐标0,1,2,3,4替换为相应的类别名称
plt.xticks(range(len(flower_class)), flower_class)
# 在柱状图上添加数值标签
for i, v in enumerate(every_class_num):
plt.text(x=i, y=v + 5, s=str(v), ha='center')
# 设置x坐标
plt.xlabel('image class')
# 设置y坐标
plt.ylabel('number of images')
# 设置柱状图的标题
plt.title('flower class distribution')
plt.show()
return train_images_path, train_images_label, val_images_path, val_images_label
def plot_data_loader_image(data_loader):
batch_size = data_loader.batch_size
plot_num = min(batch_size, 4)
json_path = './class_indices.json'
assert os.path.exists(json_path), json_path + " does not exist."
json_file = open(json_path, 'r')
class_indices = json.load(json_file)
for data in data_loader:
images, labels = data
for i in range(plot_num):
# [C, H, W] -> [H, W, C]
img = images[i].numpy().transpose(1, 2, 0)
# 反Normalize操作
img = (img * [0.229, 0.224, 0.225] + [0.485, 0.456, 0.406]) * 255
label = labels[i].item()
plt.subplot(1, plot_num, i+1)
plt.xlabel(class_indices[str(label)])
plt.xticks([]) # 去掉x轴的刻度
plt.yticks([]) # 去掉y轴的刻度
plt.imshow(img.astype('uint8'))
plt.show()
def write_pickle(list_info: list, file_name: str):
with open(file_name, 'wb') as f:
pickle.dump(list_info, f)
def read_pickle(file_name: str) -> list:
with open(file_name, 'rb') as f:
info_list = pickle.load(f)
return info_list
def train_one_epoch(model, optimizer, data_loader, device, epoch):
model.train()
loss_function = torch.nn.CrossEntropyLoss()
accu_loss = torch.zeros(1).to(device) # 累计损失
accu_num = torch.zeros(1).to(device) # 累计预测正确的样本数
optimizer.zero_grad()
sample_num = 0
data_loader = tqdm(data_loader, file=sys.stdout)
for step, data in enumerate(data_loader):
images, labels = data
sample_num += images.shape[0]
pred = model(images.to(device))
pred_classes = torch.max(pred, dim=1)[1]
accu_num += torch.eq(pred_classes, labels.to(device)).sum()
loss = loss_function(pred, labels.to(device))
loss.backward()
accu_loss += loss.detach()
data_loader.desc = "[train epoch {}] loss: {:.3f}, acc: {:.3f}".format(epoch,
accu_loss.item() / (step + 1),
accu_num.item() / sample_num)
if not torch.isfinite(loss):
print('WARNING: non-finite loss, ending training ', loss)
sys.exit(1)
optimizer.step()
optimizer.zero_grad()
return accu_loss.item() / (step + 1), accu_num.item() / sample_num
@torch.no_grad()
def evaluate(model, data_loader, device, epoch):
loss_function = torch.nn.CrossEntropyLoss()
model.eval()
accu_num = torch.zeros(1).to(device) # 累计预测正确的样本数
accu_loss = torch.zeros(1).to(device) # 累计损失
sample_num = 0
data_loader = tqdm(data_loader, file=sys.stdout)
for step, data in enumerate(data_loader):
images, labels = data
sample_num += images.shape[0]
pred = model(images.to(device))
pred_classes = torch.max(pred, dim=1)[1]
accu_num += torch.eq(pred_classes, labels.to(device)).sum()
loss = loss_function(pred, labels.to(device))
accu_loss += loss
data_loader.desc = "[valid epoch {}] loss: {:.3f}, acc: {:.3f}".format(epoch,
accu_loss.item() / (step + 1),
accu_num.item() / sample_num)
return accu_loss.item() / (step + 1), accu_num.item() / sample_num
- my_dataset.py
在from PIL import Image
import torch
from torch.utils.data import Dataset
class MyDataSet(Dataset):
"""自定义数据集"""
def __init__(self, images_path: list, images_class: list, transform=None):
self.images_path = images_path
self.images_class = images_class
self.transform = transform
def __len__(self):
return len(self.images_path)
def __getitem__(self, item):
img = Image.open(self.images_path[item])
# RGB为彩色图片,L为灰度图片
if img.mode != 'RGB':
raise ValueError("image: {} isn't RGB mode.".format(self.images_path[item]))
label = self.images_class[item]
if self.transform is not None:
img = self.transform(img)
return img, label
@staticmethod
def collate_fn(batch):
# 官方实现的default_collate可以参考
# https://github.com/pytorch/pytorch/blob/67b7e751e6b5931a9f45274653f4f653a4e6cdf6/torch/utils/data/_utils/collate.py
images, labels = tuple(zip(*batch))
images = torch.stack(images, dim=0)
labels = torch.as_tensor(labels)
return images, labels
五. 参考内容
更多推荐


所有评论(0)