参考文献:

Writing Distributed Applications with PyTorch — PyTorch Tutorials 2.0.1+cu117 documentation

33、完整讲解PyTorch多GPU分布式训练代码编写_哔哩哔哩_bilibili

pytorch进程间通信 - 文举的博客 (liwenju0.com)

前言

2023年,训练模型时,使用DDP(DistributedDataParallel)已经成为Pytorch炼丹师的标准技能。本文主要讲述实现Pytorch分布式要做哪些事情,以及如何理解Pytorch分布式训练背后的通信原理(不会很深入)。

分布式训练流程

单机多卡训练流程

算法工程师实现单机多卡训练流程的思维导图如下:

 其中 init_process_proup(初始化进程组) 是实现整个训练的前提,其中比较重要的 world_size、rank 等参数的意义 我们后面会讲解,而像 backend 参数(指定通信库),如算法工程师没有进行指定,则 pytorch做出默认选择(在较低版本的 pytorch,如V1.0.0中,backend参数为必传)。

多机多卡训练流程

代码编写单机多卡一致,可复用代码。

Pytorch进程间通信

DDP 本身是依赖 torch.distributed 提供的进程间通信能力。所以理解torch.distributed提供的进程间通信的原理,对理解DDP的运行机制有很大的帮助。官方的tutorial中,实现了依靠torch.distributed实现DDP功能的demo代码,学习一下,很有裨益。

这部分,其实就两件事儿,建立进程组和实现进程组之间的通信。

创建进程组

说到底,就是建立多个进程,并且将这些进程归并到一起,成为一个group,在group内,每个进程一个id,用于标识自己。

建立多个进程,归根到底,还是一个进程一个进程建立。 那我们想,建立一个进程时,需要怎么做才能实现进程间的寻址呢。 torch.distributed给我们答案是四个参数

  • MASTER_ADDR
  • MASTER_PORT
  • WORLD_SIZE
  • RANK

MASTER_PORT和MASTER_ADDR的目的是告诉进程组中负责进程通信协调的核心进程的IP地址和端口。当然如果该进程就是核心进程,它会发现这就是自己。 RANK参数是该进程的id,WORLD_SIZE是说明进程组中进程的个数。

了解以上这些知识,就可以看一下创建进程组的代码:

"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def run(rank, size):
    """ Distributed function to be implemented later. """
    pass

def init_process(rank, size, fn, backend='nccl'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size) #这段代码就是将该进程加入到进程组中的核心代码
    fn(rank, size)


if __name__ == "__main__":
    size = 2
    processes = []
    mp.set_start_method("spawn")
    for rank in range(size):
        p = mp.Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

观察代码,可以看到, MASTER_ADDR 和 MASTER_PORT 是通过在代码中设置环境变量传递给 torch.distributed 的, RANK 和 WORLD_SIZE 是通过参数传递的,其实也可以通过设置环境变量的方式传递。如下方法:

    # 一般分布式GPU训练使用nccl后端,分布式CPU训练使用gloo后端
    # init_method的值设为"env://",表示需要的四组参数信息都在环境变量里取。
    dist.init_process_group(backend="nccl", init_method="env://")

上面代码中的run就是在初始化好进程组之后执行的函数,这里之所以传入rank 和size,是想在执行过程中根据不同的rank,来给不同的进程赋予不同的行为,比如,日志只在rank==0的进程中打印等。实际上,如果已经初始化了进程组,也可以通过如下两个函数获取相应的值,避免在函数中传递这两个参数。

def get_world_size() -> int:
    """Return the number of processes in the current process group."""
    if not dist.is_available() or not dist.is_initialized():
        return 1
    return dist.get_world_size()


def get_rank() -> int:
    """Return the rank of the current process in the current process group."""
    if not dist.is_available() or not dist.is_initialized():
        return 0
    return dist.get_rank()

这段代码是官方给的demo,看过之后,不免有些疑惑。这个代码似乎只适用于单机多卡的情况。 对于多机多卡的情况,在不同的机器上执行这个代码,MASTER_PORT 和MASTER_ADDR不用变,WORLD_SIZE需要调整为4,因为我们的代码每台机器上都启动两个进程, RANK这个时候就会发生冲突,不同的机器上的进程有相同的编号。解决方法就是在执行初始化函数时,传递一个NODE_RANK和NPROC_PER_NODE的参数,通过NODE_RANK和NPROC_PER_NODE计算出各个进程的RANK值,就可以保证不冲突了。 代码示例如下。

for r in range(NPROC_PER_NODE):
  RANK = NODE_RANK*NPROC_PER_NODE + r

实际上,torch已经将上面这些计算过程帮我们封装好了。代码如下所示:

python -m torch.distributed.launch \
            --master_port 12355 \ #主节点的端口
            --nproc_per_node=8 \ #每个节点启动的进程数
            --nnodes=nnodes  \ #节点总数
            --node_rank=1  \  # 当前节点的rank
            --master_addr=master_addr \ #主节点的ip地址
            --use_env \ #在环境变量中设置LOCAL_RANK
            train.py

使用这段代码启动train.py时,原先的

    os.environ["MASTER_ADDR"] = "127.0.0.1"
    os.environ["MASTER_PORT"] = "29500"
    os.environ["RANK"] = str(rank)
    os.environ["WORLD_SIZE"] = str(size)
    # 一般分布式GPU训练使用nccl后端,分布式CPU训练使用gloo后端
    dist.init_process_group(backend="nccl", init_method="env://")
    fn(rank, size)

可以简单改成:

dist.init_process_group(backend="nccl", init_method="env://")
fn()

需要的四个环境变量参数,torch.distributed.launch都会帮我们设置好。fn中需要rank和size的地方,使用上面的两个便利函数即可。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐