前言

本篇文章记录 CS336 作业 Assignment 2: Systems 中的 DDP Training 作业要求,仅供自己参考😄

Assignment 2https://github.com/stanford-cs336/assignment2-systems

referencehttps://chatgpt.com/

1. Distributed Data Parallel Training 作业要求

以下内容均翻译自 cs336_spring2025_assignment2_systems.pdf,请大家查看原文档获取更详细的内容

在本次作业的下一部分中,我们将探索 使用多张 GPU 训练语言模型的方法,重点关注 数据并行(data parallelism)。我们将首先介绍 PyTorch 中的分布式通信基础,然后研究一种 朴素的分布式数据并行训练实现,并在此基础上实现并基准测试多种用于提升通信效率的改进方法

1.1 Single-Node Distributed Communication in PyTorch

我们先来看一个在 PyTorch 中实现的 简单分布式示例程序,其目标是生成四个随机整数张量并计算它们的和。在下面的分布式示例中,我们会启动 4 个工作进程(worker processes),每个进程都会生成一个随机整数张量,为了对这些张量进行求和,我们会在各个进程之间调用 all-reduce 这一集合通信操作,该操作会用 all-reduce 的结果(即求和后的值)替换每个进程中原始的数据张量

下面我们来看一段示例代码:

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def distributed_demo(rank, world_size):
    setup(rank, world_size)
    data = torch.randint(0, 10, (3,))
    print(f"rank {rank} data (before all-reduce): {data}")
    dist.all_reduce(data, async_op=False)
    print(f"rank {rank} data (after all-reduce): {data}")

if __name__ == "__main__":
    world_size = 4
    mp.spawn(fn=distributed_demo, args=(world_size,), nprocs=world_size, join=True)

在运行上述脚本之后,我们会得到如下输出:

$ uv run python distributed_hello_world.py
rank 3 data (before all-reduce): tensor([3, 7, 8])
rank 0 data (before all-reduce): tensor([4, 4, 7])
rank 2 data (before all-reduce): tensor([6, 0, 7])
rank 1 data (before all-reduce): tensor([9, 5, 3])
rank 1 data (after all-reduce): tensor([22, 16, 25])
rank 0 data (after all-reduce): tensor([22, 16, 25])
rank 3 data (after all-reduce): tensor([22, 16, 25])
rank 2 data (after all-reduce): tensor([22, 16, 25])

可以看到,正如预期的那样,每个 worker 进程在一开始都持有不同的数据张量。随后,在执行 all-reduce 操作之后,该操作会对所有 worker 进程中的张量进行求和,并将结果 就地(in-place)写回 到每一个 worker 进程中,使它们都保存相同的 all-reduce 结果

如果你多次运行这个脚本,你会注意到打印输出的顺序并不是确定的。这是因为该程序运行在分布式环境中,我们无法控制各个进程中命令的具体执行顺序,唯一可以保证的是:在 all-reduce 操作完成之后,各个进程最终持有的结果张量在位级别上是完全一致的

现在让我们更仔细地回顾一下前面的脚本,mp.spawn 会创建 nprocs 个进程,并使用给定的参数运行函数 fn。这里 fn 的函数签名是 fn(rank, *args),其中 rank 表示当前 worker 进程的编号,其取值范围是 0 到 nprocs - 1。因此,我们定义的 distributed_demo 函数必须将 rank 作为第一个位置参数,同时,我们还传入了 world_size,它表示 worker 进程的总数

每一个 worker 进程都属于同一个 进程组(process group),该进程组通过 dist.init_process_group 进行初始化,进程组表示一组能够通过共享的 master 节点进行协调和通信的 worker 进程。这里的 master 由其 IP 地址和端口号定义,并且 master 进程对应的 rank 为 0,诸如 all-reduce 这样的 集合通信操作(collective communication) 会在进程组中的每一个进程上执行

在这个示例中,我们使用的是 "gloo" 后端来初始化进程组,不过 PyTorch 还支持其他通信后端,尤其是 "nccl" 后端,它基于 NVIDIA 的 NCCL 集合通信库实现,通常在处理 CUDA 张量时具有更高的性能。然而,NCCL 只能运行在带有 GPU 的机器上,而 Gloo 则既可以运行在 CPU-only 机器上,也可以运行在 GPU 环境中

一个常见的经验法则是:

  • 分布式 GPU 训练 使用 NCCL
  • 分布式 CPU 训练本地开发与调试 使用 Gloo

在本示例中,我们选择 Gloo 是因为它允许在仅有 CPU 的机器上进行本地执行和开发

当在多 GPU 环境下运行时,请务必确保不同的 rank 使用不同的 GPU,一种常见的做法是在 setup 函数中调用 torch.cuda.set_device(rank),这样当你执行 tensor.to("cuda) 时,张量就会自动移动到对应 rank 的 GPU 上。另一种方式是显式地为每个 rank 构造对应的设备字符串(例如 device = f"cuda:{rank}"),并在进行数据迁移时显式指定目标设备(例如 tensor.to(f"cuda:{rank}")

Terminology.

在本次作业的后续部分(以及你在网上可能看到的其他资料中),你会在 PyTorch 分布式通信 的语境下遇到以下术语,尽管本作业主要关注 单机、多进程 的分布式训练,这些术语对于理解 分布式训练的一般概念 仍然非常有帮助,可参考 Figure 2 获取直观示意

node:网络中的一台机器

worker:参与分布式训练的一个程序实例。在本次作业中,每个 worker 只对应 一个进程,因此我们会将 worker、process 和 worker process 这几个术语交替使用。需要注意的是,在实际应用中,一个 worker 可能会使用多个进程(例如用于加载训练数据),因此这些术语在实践中并不总是严格等价的

global rank:一个介于 0 到 world_size - 1 之间的整数 ID,用于在进程组中 唯一标识一个 worker,例如,当 world size 为 2 时,一个进程的 global rank 为 0(主进程),另一个进程的 global rank 为 1

local world size:当应用运行在多个节点上时,local world size 指的是 单个节点上本地运行的 worker 数量,例如,如果一个应用在 2 个节点上各启动 4 个 worker,那么 world size 为 8,而 local world size 为 4。需要注意的是,当只在 单节点 上运行时,worker 的 local world size 等同于(global)word size

local rank:一个介于 0 到 local_world_size - 1 之间的整数 ID,用于唯一标识某台机器上的本地 worker,例如,如果一个应用在 2 个节点上各启动 4 各进程,那么每个节点上的 worker 的 local rank 分别为 0、1、2、3。同样需要注意的是,在 单节点多进程 的分布式应用中,一个进程的 local rank 与其 global rank 是等价的

Figure 2: A schematic representation of a distributed application running on 2 nodes with a world size of 8.

Note:每个 worker 进程都由一个全局 rank(global rank,范围为 0 到 7)和一个本地 rank(local rank,范围为 0 到 3)唯一标识,该图引自:lightning.ai/docs/fabric/stable/advanced/distributed_communication.html

1.1.1 Best Practices for Benchmarking Distributed Applications

在本作业的这一部分中,你将对分布式应用进行基准测试,以更好地理解 通信带来的开销,以下是一些推荐的最佳实践:

  • 在条件允许的情况下,尽量在同一台机器上运行基准测试,以便进行可控、可重复的对比实验
  • 在正式计时之前进行多次 warm-up(预热),这一点对于 NCCL 通信操作 尤为重要,通常进行 5 次预热迭代 就已经足够
  • 在 GPU 上做基准测试时,务必调用 torch.cuda.synchronize(),以确保所有 CUDA 操作真正执行完成。需要注意的是,即便在通信操作中使用了 async_op=False,这一步仍然是必要的,因为该调用只保证操作被 提交到 GPU 队列中,而不代表通信本身已经实际完成,更多背景与实现细节可参考:github.com/pytorch/pytorch/issues/68112#issuecomment-965932386
  • 不同 rank 之间的计时结果可能会有轻微差异,因此通常会对多个 rank 的测量结果进行聚合,以获得更稳定、准确的估计。你可能会发现 all-gather 集体通信操作(具体来说是 dist.all_gather_object 函数)在收集各个 rank 的结果时非常有用
  • 一般建议先在 CPU 上使用 Gloo 后端进行本地调试,然后在题目要求的情况下再切换到 GPU 上使用 NCCL 后端进行基准测试,在不同通信后端之间切换通常只需要修改 init_process_group 的调用方式以及张量的设备类型转换即可
Problem (distributed_communication_single_node): 5 points

编写一个脚本,用于在 单节点多进程(single-node multi-process) 设置下,对 all-reduce 操作的执行时间进行基准测试。上面给出的一个示例代码可以作为一个合理的起点,请在以下配置上进行实验,对比不同设置下的性能表现:

  • 后端 + 设备类型:Gloo + CPU,NCCL + GPU
  • all-reduce 数据规模:float32 数据张量,大小分别为 1MB、10MB、100MB、1GB
  • 进程数量:2、4 或 6 个进程
  • 资源限制:最多使用 6 张 GPU,每一次基准测试运行时间应控制在 5 分钟以内

Deliverable:请给出图表和表格,对比上述不同设置下的结果,并附上 2-3 句对实验结果的分析与讨论,说明各个因素(如后端类型、数据规模、进程数)之间是如何相互影响的。

1.2 A Naïve Implementation of Distributed Data Parallel Training

在前面我们已经了解了如何在 PyTorch 中编写分布式应用的基础,现在我们来构建一个 最小化的分布式数据并行(DDP)训练实现

数据并行会将一个 batch 切分并分发到多个设备(例如 GPU)上,从而支持使用单个设备无法容纳的大 batch size 进行训练,举例来说,假设有 4 个设备,每个设备最多只能处理 batch size 为 32 的数据,那么通过数据并行训练,我们就可以实现等效的 batch size 为 128 的训练

下面是进行 朴素分布式数据并行训练 的基本步骤:在初始化阶段,每个设备都会构建一个(随机初始化的)模型,我们使用 broadcast 这种集合通信操作,将模型参数从 rank 0 发送到所有其他 rank,训练开始时,每个设备都持有 完全一致的模型参数和优化器状态(例如 Adam 中累积的梯度统计量)

1. 数据切分

给定一个包含 n n n 个样本的 batch,batch 会被拆分,每个设备接收其中 n / d n/d n/d 个互不重叠的样本(其中 d d d 是用于数据并行训练的设备数量),这里要求 n n n 能被 d d d 整除,因为训练速度会受到最慢进程的限制。

2. 前向和反向传播

每个设备使用其本地的模型参数,在接收到的 n / d n/d n/d 个样本上执行前向传播,并进行反向传播以计算梯度,注意,此时每个设备只持有基于自身那部分样本计算得到的梯度。

3. 梯度聚合

接着,使用 all-reduce 集合通信操作,在不同设备之间对梯度进行平均,这样一来,每个设备最终都持有 基于全部 n n n 个样本的平均梯度

4. 参数更新

然后,每个设备各自执行一次优化器更新步骤,更新其本地的参数副本,从优化器的角度看,这只是对一个本地模型进行优化。由于所有设备在训练开始时拥有相同的参数和优化器状态,并且在每一步都使用相同的平均梯度进行更新,因此所有设备上的参数和优化器状态始终保持同步。

完成上述步骤后,就完成了一次完整的训练迭代,随后可以重复这一过程。

Problem (naive_ddp): 5 points

Deliverable:编写一个脚本,通过在反向传播之后对 各个参数的梯度进行 all-reduce 的方式,朴素地实现分布式数据并行(DDP)训练。为了验证你的 DDP 实现是否正确,请使用该脚本在 随机生成的数据 上训练一个 小型 toy 模型,并验证其训练得到的权重是否与 单进程训练 得到的结果一致。

Note:如果你在编写这个测试时遇到困难,可以参考 tests/test_ddp_individual_parameters.py

Problem (naive_ddp_benchmarking): 3 points

在这种 朴素(naïve)的 DDP 实现 中,每一次反向传播结束后,都会对 每一个参数的梯度在各个 rank 之间单独执行 all-reduce 操作。为了更好地理解数据并行训练所带来的 通信开销,请编写一个脚本,对你之前实现的语言模型在使用这种朴素 DDP 方案训练时的性能进行基准测试,你需要:

  • 测量 每一步训练的总耗时
  • 测量其中 用于梯度通信的时间占比

请在 单节点环境(1 个节点 x 2 块 GPU) 下进行测量,并使用 §1.1.2 中描述的 XL 模型规模(Table 1)作为测试对象

Deliverable:请给出你基准测试的设置说明,并报告在每种设置下的单次训练迭代耗时以及用于梯度通信的时间。

1.3 Improving Upon the Minimal DDP Implementation

我们在 §1.2 中看到的最小化 DDP 实现存在几个关键性的局限:

1. 每个参数张量都会单独执行一次 all-reduce 操作

每一次通信调用都会引入额外开销,因此,将多个通信操作进行 合并(batching) 以减少通信次数往往是更有利的做法。

2. 只有在整个反向传播结束后才开始进行梯度通信

然而,反向传播本身是 逐步(incremental)计算 的:当某个参数的梯度一旦计算完成,就可以立刻开始通信,而不必等待其他参数的梯度全部就绪。这样做可以将 梯度通信与反向计算重叠执行,从而降低分布式数据并行训练的整体通信开销。

在本作业的这一部分,我们将 依次解决上述每一个限制,并测量这些改进对训练速度所带来的影响

1.3.1 Reducing the Number of Communication Calls

与其为 每一个参数张量 分别发起一次通信调用,不如通过 合并(batching)all-reduce 操作 来提升性能。具体来说,我们可以将需要进行 all-reduce 的各个参数梯度先 拼接(concatenate)成一个大的张量,然后再对这个合并后的张量在所有 rank 上执行一次 all-reduce 操作

在实现时,可以考虑使用 torch._utils._flatten_dense_tensorstorch._utils._unflatten_dense_tensors 这两个工具函数,分别用于将多个梯度张量展平成一个张量,以及在通信完成后再还原回原始形状

Problem (minimal_ddp_flat_benchmarking): 2 points

修改你的最小化 DDP 实现,使其在通信阶段对 所有参数的梯度先进行展平并合并成一个张量,然后只进行 一次 batched all-reduce 通信。请将该实现的性能与之前的最小 DDP 实现进行对比,后者是在相同实验条件下 (1 个节点 x 2 块 GPU,XL 模型规模,§1.1.2 中描述的设置)对每个参数张量分别执行一次 all-reduce

Deliverable:报告每次训练迭代的耗时以及分布式数据并行训练中用于梯度通信的时间,并用 1-2 句话简要对比说明使用单次合并 all-reduce 通信与逐参数通信在性能上的差异。

1.3.2 Overlapping Computation with Communication of Individual Parameter Gradients

虽然通过 合并通信调用 可以减小大量小规模 all-reduce 操作所带来的调用开销,但通信本身所消耗的时间仍然会直接计入整体开销。为了解决这一问题,我们可以利用这样一个事实:反向传播是按层逐步计算梯度的(从 loss 开始,逐层向输入方向推进)。因此,一旦某一层的参数梯度计算完成,我们就可以 立即对该梯度发起 all-reduce 通信,而无需等待整个反向传播结束。

通过这种方式,可以将 反向传播的计算过程与梯度通信过程进行重叠(overlap),从而有效降低分布式数据并行训练的整体开销。在本节中,我们将首先实现并 benchmark 一个分布式数据并行(DDP)封装器:该封装器会在反向传播过程中,随着每个参数梯度的就绪,异步地对单个参数张量执行 all-reduce 操作

下面给出一些有用的实现提示:

Backward hooks

为了在某个参数的梯度在反向传播中 完成累积后自动触发一个函数调用,我们可以使用 register_post_accumulate_grad_hook 这个接口。该钩子函数会在参数梯度完成累积之后被调用,非常适合用于在梯度 “就绪” 的第一时间启动异步通信。更多关于该接口的说明和使用示例,请参考:pytorch.org/docs/stable/generated/torch.Tensor.register_post_accumulate_grad_hook.html

Asynchronous communication

所有 PyTorch 的 集合通信(collective communication)操作 都同时支持 同步执行async_op=False)和 异步执行async_op=True)。同步调用 会阻塞当前程序,直到该集合通信操作被 成功加入 GPU 的执行队列 为止,需要注意的是,这并不意味着 CUDA 操作已经完成,因为 CUDA 本身是异步执行的,也正因如此,后续的函数调用通常可以如预期那样正确使用该输出

Note:在更高级的使用场景中,如果你使用了多个 CUDA stream,可能需要在 stream 之间进行显式同步,以保证输出在后续操作中是可用的,请参考:pytorch.org/docs/stable/notes/cuda.html#cuda-streams

异步调用 则不同,它会在调用后 立即返回一个分布式请求句柄(distributed request handle),这意味着当函数返回时,集合通信操作 不一定已经被加入 GPU 执行队列,更不用说该通信操作已经完成了。如果你希望 等待通信操作被加入 GPU 执行队列(从而保证其结果可以被后续操作安全使用),可以对返回的通信句柄调用 handle.wait()

下面的两个示例展示了如何对一张张量列表中的每个张量执行 all-reduce 操作,分别使用同步方式和异步方式:

tensors = [torch.rand(5) for _ in range(10)]

# Synchronous, block until operation is queued on GPU.
for tensor in tensors:
    dist.all_reduce(tensor, async_op=False)

# Asynchronous, return immediately after each call and
# wait on results at the end.
handles = []
for tensor in tensors:
    handle = dist.all_reduce(tensor, async_op=True)
    handles.append(handle)

# ...
# Possibly execute other commands that don't rely on the all-reduce results
# ...

# Ensure that all-reduce calls were queued and
# therefore other operations depending on the
# all-reduce output can be queued.
for handle in handles:
    handle.wait()
handles.clear()
Problem (ddp_overlap_individual_parameters): 5 points

请实现一个 Python 类,用于处理 分布式数据并行(Distributed Data Parallel, DDP)训练,该类需要封装任意一个 PyTorch 的 nn.Module,并在训练开始前负责 广播模型权重(从而保证所有 rank 拥有相同的初始函数),以及在训练过程中 发起梯度平均所需的通信操作

我们建议你实现如下的公共接口:

  • __init__(self, module: torch.nn.Module):给定一个已经实例化的 PyTorch nn.Module,构造一个 DDP 容器,用于在不同 rank 之间处理梯度同步
  • forward(self, *inputs, **kwargs):使用给定的位置参数和关键字参数,调用被封装模块的 forward() 方法
  • finish_gradient_synchronization(self):当该函数被调用时,应等待所有 异步通信操作 完成,以确保梯度相关的通信已经正确地排队到 GPU 上

为了使用这个类进行分布式训练,我们会将一个模块传入该类进行封装,然后在调用 optimizer.step() 之前,显式调用一次 finish_gradient_synchronization(self) 以确保 依赖梯度的优化器更新操作 能够正确地被调度执行,使用示例如下:

model = ToyModel().to(device)
ddp_model = DDP(model)
for _ in range(train_steps):
    x, y = get_batch()
    logits = ddp_model(x)
    loss = loss_fn(logits, y)
    loss.backward()
    ddp_model.finish_gradient_synchronization()
    optimizer.step()

Deliverable:请实现一个用于分布式数据并行训练的 容器类,该类应当能够 将梯度通信与反向传播计算进行重叠(overlap),以减少通信带来的训练开销。为了测试你的 DDP 实现,你需要首先实现 [adapters.get_ddp_individual_parameters][adapters.ddp_individual_parameters_on_after_backward](该项为可选,取决于你的实现是否需要),然后运行测试:

uv run pytest tests/test_ddp_individual_parameters.py

我们建议你多次运行测试(例如 5 次),以确保实现具有足够的稳定性并能够可靠通过。

Problem (ddp_overlap_individual_parameters_benchmarking): 1 point

(a) 请对你的 DDP 实现进行性能基准测试,评估在 将反向传播计算与单个参数梯度的通信进行重叠(overlap) 时的训练性能,并将其与此前学习过的两组设置进行对比:

  • 最小化 DDP 实现:对每个参数张量分别执行一次 all-reduce
  • 批量通信实现:将所有参数张量拼接后执行一次 all-reduce

所有对比应在 相同的实验设置 下完成:1 个节点、2 块 GPU 以及 §1.1.2 中描述的 XL 模型规模

Deliverable:给出在 “反向传播与单参数梯度通信重叠” 条件下,每一次训练迭代的耗时,并用 1-2 句话对比并总结不同实现之间的性能差异。

(b) 请在你的基准测试代码中(同样使用 1 节点、2 GPU、XL 模型规模),引入 Nisight profiler,对以下两种实现进行对比分析:

  • 初始的 DDP 实现
  • 当前这种将反向计算与通信进行重叠的 DDP 实现

通过 可视化方式 对比两条执行轨迹,并提供 profiler 截图,清楚展示其中一种实现能够将计算与通信重叠,而另一种不能

Deliverable:提交两张 profiler 截图,一张来自初始 DDP 实现,一张来自支持计算与通信重叠的 DDP 实现,截图应能够直观展示通信操作是否与反向传播过程发生了重叠。

1.3.3 Overlapping Computation with Communication of Bucketed Parameter Gradients

在上一小节(§1.3.2)中,我们实现了 将反向传播计算与单个参数梯度的通信进行重叠。然而,我们之前也观察到,对通信调用进行 批处理(batching) 通常可以进一步提升性能,尤其是在参数张量数量很多的情况下(这在深度 Transformer 模型中非常常见)

在之前的实现中,通过批处理的方式,我们是 在反向传播全部完成之后一次性发送所有梯度,但这种方式的缺点是:它必须等待整个反向传播结果才能开始通信,从而错失了与计算重叠的机会

在本节中,我们将尝试 兼顾这两种方法的优势

  • 一方面,通过将参数组织成 多个 bucket(分桶) 来减少整体通信调用的次数
  • 另一方面,当每个 bucket 中的所有参数梯度都准备就绪时,就立即对该 bucket 执行 all-reduce 操作

这种策略使我们能够在 减少通信调用次数的同时,又能实现通信与计算的重叠,从而进一步提升分布式数据并行训练的整体效率

Problem (ddp_overlap_bucketed): 8 points

实现一个用于 分布式数据并行训练 的 Python 类,通过 梯度分桶(gradient bucketing)来提升通信效率。该类需要封装任意一个输入的 PyTorch nn.Module,并在训练开始前负责广播模型权重(确保所有 rank 拥有相同的初始参数),同时以 分桶的方式 发起梯度平均的通信操作

我们推荐使用如下公共接口:

  • def __init__(self, module: torch.nn.Module, bucket_size_mb: float):给定一个已经实例化的 PyTorch nn.Module 作为并行对象,构造一个 DDP 容器,用于在各个 rank 之间进行梯度同步。梯度同步应当以 bucket 的形式进行,每个 bucket 中包含的参数总大小不超过 bucket_size_mb(以 MB 计)
  • def forward(self, *inputs, **kwargs):使用给定的位置参数和关键字参数,调用被封装模型的 forward() 方法
  • def finish_gradient_synchronization(self):当该函数被调用时,应等待所有 异步通信操作 在 GPU 上完成排队(queued)

除了新增的 bucket_size_mb 初始化参数之外,该公共接口与之前实现的、对每个参数单独通信的 DDP 实现是保持一致的。我们建议按照 model.parameters()逆序 将参数分配到各个 bucket 中,因为在反向传播中,梯度大致会按照这个顺序逐步就绪

Deliverable:实现一个用于分布式数据并行训练的容器类,该类需要在反向传播计算过程中重叠梯度通信与计算,并且梯度通信必须采用 bucket 化的方式,以减少总体通信调度的次数。为了测试你的实现,请完成以下适配器函数:

  • [adapters.get_ddp_bucketed]
  • [adapters.ddp_bucketed_on_after_backward](该项为可选,取决于你的实现是否需要)
  • [adapters.ddp_bucketed_on_train_batch_start](该项为可选,取决于你的实现是否需要)

然后,通过运行以下命令执行测试:

uv run pytest tests/test_ddp.py

我们建议你多次运行测试(例如 5 次),以确保实现具有足够的稳定性并能够可靠通过。

Problem (ddp_bucketed_benchmarking): 3 points

(a) 在与前面实验相同的配置下(1 个节点、2 块 GPU、XL 模型规模),对你实现的 分桶(bucketed)DDP 版本进行性能基准测试,改变 最大 bucket 大小(例如:1、10、100、1000 MB),将这些结果与 未使用分桶机制 的实验结果进行比较 — 你的实验结果是否符合你的预期?如果不符合,原因可能是什么?你可能需要使用 PyTorch profiler 来更深入地理解通信调用是如何被调度和执行的。另外,你认为需要对实验设置做出哪些改变才能使结果更符合你的预期?

Deliverable:给出不同 bucket 大小下,每次训练迭代的测量时间,并用 3-4 句话对实验结果、你的预期以及任何不一致的现象的潜在原因进行说明。

(b) 假设计算一个 bucket 中梯度所需的时间与通信该梯度 bucket 所需的时间是 相同的,在此假设下,写出一个公式用来建模 DDP 的通信开销(即反向传播结束后所额外花费的时间)作为以下变量的函数:

  • s s s:模型参数的总大小(字节数)
  • w w w:all-reduce 算法的带宽,定义为每个 rank 传输的数据量除以完成一次 all-reduce 所需的时间
  • o o o:每一次通信调用所带来的固定开销(秒)
  • n b n_b nb:bucket 的数量

在此基础上,再推导出一个公式,用于给出 使 DDP 通信开销最小化的最优 bucket 大小

Deliverable:一个用于建模 DDP 通信开销的公式以及一个用于计算最优 bucket 大小的公式。

1.4 4D Parallelism

尽管在实现层面要复杂得多,但训练过程实际上可以沿着更多维度进行并行化,通常我们会讨论以下 5 种并行方式

  • 数据并行(Data Parallelism, DP):将一个 batch 得数据拆分到多个设备上,每个设备对其各自得数据子集计算梯度,随后需要以某种方式在设备之间对梯度进行聚合。
  • 全量参数分片数据并行(Fully-Sharded Data Parallelism, FSDP):将优化器状态、梯度以及模型权重本身拆分到不同设备上,如果只使用 DP 和 FSDP,那么在执行前向或反向传播之前,每个设备都需要先从其他设备收集所需的权重分片。
  • 张量并行(Tensor Parallelism, TP):沿着一个新的维度对激活值进行切分,每个设备只计算自己所负责分片的输出结果,在张量并行种,可以选择沿输入维度或输出维度对算子进行切分。TP 通常与 FSDP 结合使用,用于在相应维度上同时切分权重和激活。
  • 流水线并行(Pipeline Parallelism, PP):将模型按层划分为多个阶段(stage),每个阶段运行在不同的设备上,形成流水线执行。
  • 专家并行(Expert Parallelism, EP):在混合专家模型(Mixture-of-Experts)中,将不同的专家分配到不同设备上,每个设备只负责计算其对应专家的输出。

通常情况下,我们会 同时使用 FSDP 和 TP,因此可以将它们视为同一个并行维度,这样一来,我们最终就得到 4 个并行轴:DP、FSDP/TP、PP 和 EP。在本课程中,我们将主要关注 稠密模型(而非 MoE),因此不会进一步讨论 EP

在分析分布式训练时,我们常将设备集群描述为一个 设备网格(mesh),网格的各个轴对应我们所定义的并行维度。例如,如果我们有 16 块 GPU,而模型规模又远远大于单卡可容纳的大小,就可以将设备组织成一个 4x4 的 GPU 网格,其中第一维表示 数据并行(DP),第二维表示 FSDP + 张量并行(TP)

关于这些并行方式如何工作以及如何推导它们对应的通信与内存开销,可以参考《TPU Scaling Book》第 5 部分,这部分内容在后续问题中会非常有帮助。如果你希望更深入了解流水线并行,可以查阅《Ultra-Scale Playbook》附录,该书的其他章节中也包含大量可能对你有帮助的内容

Problem (communication_accounting): 10 points

考虑一个新的模型配置 XXL,其参数为:

  • d model = 16384 d_{\text{model}}=16384 dmodel=16384
  • d ff = 53248 d_{\text{ff}}=53248 dff=53248
  • num_blocks = 126 \text{num\_blocks}=126 num_blocks=126

由于该模型规模极大,绝大多数 FLOPs 都来自前馈网络(FFN),因此我们做出一些简化假设:

  • 忽略注意力层、输入嵌入层以及输出线性层
  • 假设每个 FFN 仅由 两个线性层 组成(忽略激活函数):
    • 第一层输入维度为 d model d_{\text{model}} dmodel,输出维度为 d ff d_{\text{ff}} dff
    • 第二层输入维度为 d ff d_{\text{ff}} dff,输出维度为 d model d_{\text{model}} dmodel
  • 模型由 num_blocks 个这样的 FFN block 组成
  • 不使用 activation checkpointing
  • 激活值与梯度通信使用 BF16
  • 累积梯度、主权重(master weights)以及优化器状态使用 FP32

(a)单个设备 上,以 FP32 精度存储 主模型权重、累积梯度、优化器状态 需要多少显存?同时回答反向传播阶段(这些张量使用 BF16)能节省多少显存?这些显存需求相当于多少张 H100 80GB GPU 的容量?

Deliverable:给出你的计算过程,并用一句话总结结论。

(b) 现在假设主权重、优化器状态、梯度以及一半的激活值(在实践中通常是每隔一层)被分片(shard)到 N FSDP N_{\text{FSDP}} NFSDP 个设备上,请写成 说明每个设备所需显存 的表达式,同时计算 N FSDP N_{\text{FSDP}} NFSDP 至少需要取多少才能使每个设备的总显存消耗 小于 1 个 v5p TPU(每设备 95GB)

Deliverable:你的计算过程加一句话总结。

(c) 给定 TPU v5p 的硬件参数(来自《TPU Scaling Book》):

  • 通信带宽: W ici = 2.9 × 10 10 W_{\text{ici}}=2.9\times 10^{10} Wici=2.9×1010
  • 计算吞吐: C = 4.6 × 10 14  FLOPs/s C=4.6 \times 10^{14} \text{ FLOPs/s} C=4.6×1014 FLOPs/s

并采用以下并行配置(同样遵循《TPU Scaling Book》的记号):

  • 设备 mesh: M X = 2 ,   M Y = 1    ( 2D mesh ) M_X = 2, \ M_Y=1 \ \ (\text{2D mesh}) MX=2, MY=1  (2D mesh)
  • FSDP 维度: X = 16 X=16 X=16
  • TP(Tensor Parallel)维度: Y = 4 Y=4 Y=4

请回答在该设置下,每个设备对应的 batch size 是多少时恰好是 compute-bound?此时的 总体 batch size 是多少?

Deliverable:给出你的计算过程,并用一句话总结结论。

(d) 在实际训练中,我们希望 总体 batch size 尽可能小,同时始终充分利用计算资源(即避免进入通信瓶颈状态),请回答:除了增大 batch size 之外,还有哪些技术手段可以在 保持高吞吐率 的同时,降低 batch size

Deliverable:一句话回答,并用文献引用和公式支持你的论述。

2. Optimizer State Sharding 作业要求

以下内容均翻译自 cs336_spring2025_assignment2_systems.pdf,请大家查看原文档获取更详细的内容

分布式数据并行训练在概念上相对简单,而且通常非常高效,但它要求 每个 rank 都持有一份完整的模型参数和优化器状态副本,这种冗余会带来显著的显存开销。例如 AdamW 优化器 为每个参数维护两个浮点数(一阶与二阶动量),这意味着仅优化器状态就需要 两倍于模型权重的显存[Rajbhandari+ 2020] 描述了多种在数据并行训练中降低这种冗余的方法,包括在不同 rank 之间对 优化器状态、梯度以及参数 进行分片并按需通信

在本作业的这一部分中,我们将实现一种 简化版的优化器状态分片 来降低每个 rank 的显存占用,具体来说:不再让每个 rank 为 所有参数 维护完整的优化器状态,而是每个 rank 的优化器实例只负责 一部分参数(大约是 1 / world_size 1/\text{world\_size} 1/world_size),当某个 rank 执行一次优化器更新时,它只会更新自己负责的那部分模型参数。随后,该 rank 会将 更新后的参数广播 给其他所有 rank,以确保在每一次优化器步之后,各个 rank 上的模型参数始终保持同步

Problem (optimizer_state_sharding): 15 points

实现一个用于 优化器状态分片(optimizer state sharding)的 Python 类,该类需要封装任意输入的 PyTorch nn.Module,并在每一次优化器 step 之后 负责同步更新后的参数

我们推荐使用如下公共接口:

  • def __init__(self, params, optimizer_cls: Type[Optimizer], **kwargs):初始化分片后的优化器状态。其中:
    • params:需要被优化的参数集合(或者参数组,如果用户希望对模型的不同部分使用不同的超参数,例如不同的学习率),这些参数将会在所有 rank 之间进行分片
    • optimizer_cls:要被封装的优化器类型,例如 optim.AdamW
    • 其余的关键字参数会被直接传递给 optimizer_cls 的构造函数,请确保在该方法中调用 torch.optim.Optimizer 的父类构造函数
  • def step(self, closure, **kwargs):使用给定的 closure 和关键字参数调用被封装优化器的 step 方法,在参数更新完成之后,需要与其他 rank 进行同步
  • def add_param_group(self, param_group: dict[str, Any]):向分片优化器中添加一个新的参数组。该方法会在分片优化器构造过程中,由父类构造函数调用,在训练过程中也可能被调用(例如模型逐层解冻时),因此,该方法需要能够处理 将模型参数分配到不同 rank 上 的逻辑

Deliverable:实现一个用于处理优化器状态分片的容器类,为了测试你的分片优化器实现,请先实现测试适配器 [adapters.adapters.get_sharded_optimizer],然后运行以下测试命令:

uv run pytest tests/test_sharded_optimizer.py

我们建议你多次运行测试(例如 5 次),以确保实现具有足够的稳定性并能够可靠通过。

Problem (optimizer_state_sharding_accounting): 5 points

现在我们已经实现了优化器状态分片(optimizer state sharding),接下来分析它在训练过程中对 峰值显存占用以及运行时开销 的影响

(a) 编写一个脚本,对比 使用与不使用优化器状态分片 时训练语言模型的 峰值显存占用。请在标准配置下进行测试(1 个节点、2 块 GPU、XL 模型规模),并分别报告在 模型初始化完成后、执行 optimizer step 之前以及执行 optimizer step 之后 这三个时刻的峰值显存占用,结果是否与你的预期一致?请进一步拆解每种设置下的显存使用情况,例如:参数占用多少显存、优化器占用多少显存等

Deliverable:用 2-3 句话总结峰值显存占用的结果,并说明显存在不同模型组件和优化器组件之间是如何分配的。

(b) 优化器状态分片会如何影响速度?在标准配置(1 个节点、2 块 GPU、XL 模型规模)下,分别测量 使用和不使用优化器状态分片 时,每个训练 iteration 所需的时间

Deliverable:用 2-3 句话给出你的计时结果。

(c) 我们实现的优化器状态分片方法与 ZeRO stage 1(在 [Rajbhandari+ 2020] 描述的 ZeRO-DP P o s P_{os} Pos)有何不同?请总结两者的差异,尤其是与 显存占用和通信量 相关的不同点

Deliverable:用 2-3 句话进行总结说明。

3. Epilogue

恭喜你完成本次作业!我们希望你觉得这次作业既有趣又有收获,并且在实现过程中对 如何通过提升单卡 GPU 性能以及利用多卡 GPU 来加速语言模型训练 有了更多的理解和实践体会。

结语

这篇文章我们系统性地梳理了 CS336 Assignment 2 中 Distributed Data Parallel(DDP)Training 的全部作业要求,从 PyTorch 分布式通信的基本概念出发,逐步覆盖了朴素 DDP、梯度通信开销分析、通信合并(flattening)、计算与通信重叠(overlap)、梯度分桶(bucketing)以及更大规模并行训练中常见的 4D 并行范式

下篇文章我们就来一起看看 DDP Training 具体该如何实现,敬请期待🤗

参考

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐