好的,我们来用中文详细解释一下这个执行流程。

这个架构是分布式计算中非常经典的 “控制器-工作者”(Controller-Worker)模式 的一种高效实现。

为了方便理解,我们可以使用一个生动的比喻:指挥家和交响乐团

核心关系:Cluster vs. Worker

  • Worker (演奏家): Worker 是一个独立的、实际执行计算任务的单元。它是一个 ray.Actor,意味着它是一个有自己状态的独立进程。它持有模型的一部分,与指定的GPU绑定,并负责执行具体的计算,比如模型推理、训练、参数更新等。它知道自己的身份(比如它是乐团的第几号小提琴手),但通常不关心整个乐团的宏观调度。

  • Cluster (指挥家): Cluster 是一个逻辑上的控制器,它管理着一组 Worker(整个乐团)。它自己不执行任何繁重的计算(指挥家自己不拉小提琴),而是负责编排和指挥所有的 Worker。用户的代码主要与 Cluster 交互。当你想让整个团队做某件事时,你向 Cluster 下达指令,它会把指令精准地分发给相应的 Worker

下面是它们角色的总结:

方面 Worker (执行者) Cluster (协调者)
本质 一个 ray.Actor,即一个独立的、有状态的进程。 一个普通的Python对象,但它持有对所有 Worker Actor的远程引用(ActorHandles)。
职责 - 执行计算任务(如 model_update)。
- 管理自身状态(模型权重、优化器等)。
- 与分配给它的硬件(GPU/NPU)交互。
- 创建、配置并将 Worker Actor放置到指定的硬件上。
- 提供统一的API,向全部或部分Worker分派命令。
- 从 Worker 收集并聚合结果。
感知范围 只知道自己的 rank 和总 world_size 了解所有 Worker,知道它们的 rank,并能作为一个整体与它们通信。
任务示例 self.strategy.model_update(): 运行一个前向/反向传播。 my_cluster.start_model_update(): 告诉每一个Worker去执行它们各自的model_update方法。

详细执行流程(分步解析)

让我们追踪从创建到执行命令的完整生命周期。

第一步:Cluster 的初始化(组建乐团)

这是“指挥家”组建并安排“乐团”的过程。

  1. 用户代码: 你首先创建一个 Cluster 实例。

    # 简化示例
    train_cluster = Cluster(
        name="trainer",
        worker_cls=MyTrainingWorker,
        resource_manager=my_resource_manager,
        worker_config=my_worker_config
    )
    
  2. 资源分配 (ResourceManager):

    • Cluster__init__ 方法会立即调用 resource_manager.allocate_placement_group()
    • ResourceManager 之前已经通过 ray.util.placement_group 在Ray集群上预留了资源。PlacementGroup 是Ray的一个特性,它能保证一组资源(比如“某个节点上的8个GPU”)被整体预留,不会被其他任务抢占。这个过程就像在物理机房里“圈地”。
    • allocate_placement_group 将逻辑上的 world_size(例如,需要16个 Worker)映射到物理硬件上。它会返回每个 Worker 所需的资源规格列表,例如:
      # 对于 rank=0 的 Worker,它需要使用0号节点上的0号GPU
      [
          [ {'node_rank': 0, 'gpu_rank': 0, 'placement_group': <PG_object_for_node0>} ], # Worker 0 的资源
          [ {'node_rank': 0, 'gpu_rank': 1, 'placement_group': <PG_object_for_node0>} ], # Worker 1 的资源
          ...
      ]
      
  3. 创建 Worker (Cluster._create_workers):

    • Clusterrank = 0 循环到 world_size - 1
    • 对于每一个 rank:
      • 它准备一套环境变量(RANK, WORLD_SIZE, MASTER_ADDR, CUDA_VISIBLE_DEVICES等)。这是每个 Worker 进程启动后了解自己**“身份ID”**和如何加入分布式通信组(如 torch.distributed)的方式。
      • 它使用 self.worker_cls.options(...).remote(...) 来创建 Worker Actor。
      • 最关键的一步是,.options() 调用中传入了 scheduling_strategy,并指定了从 ResourceManager 获取的 PlacementGroup。这精确地告诉Ray:“请把这个 Worker Actor启动到我指定的那个节点的那块GPU上”。
      • remote() 调用返回一个 ActorHandle(对 Worker 的远程引用),这个引用被存放在 self.workers 列表中。
  4. 方法绑定 (Cluster._bind_worker_method):

    • 这是最“神奇”的部分,实现了一种动态代理
    • Cluster 检查 worker_clsWorker类)的所有方法,寻找那些被 @register 装饰器标记过的方法(例如 Worker 中的 load_states)。
    • 对于每个找到的方法,它在 Cluster 实例上动态创建一个同名的新方法(例如 cluster.load_states)。
    • 这个新方法是一个代理。当你调用它时,它并不会执行 Cluster 自己的代码,而是执行预先配置好的分发逻辑,这个逻辑知道如何将这个调用请求转发给所有实际的 Worker Actor。

初始化结束后,Cluster 对象就准备好了。它管理着一个运行在指定GPU上的 Worker Actor列表,并且拥有了一套可以直接调用的代理方法,随时准备向 Worker 们下达指令。

第二步:执行一个命令(开始演奏)

现在,我们看看当你调用 Cluster 上的一个方法时会发生什么。

  1. 用户代码: 你调用了上一步动态绑定的代理方法。

    # 这个调用是在 Cluster 对象上发起的
    results = train_cluster.start_model_update(...)
    

    这里以 ModelUpdateGroup 中的调用为例。

  2. 代理方法被触发:

    • 这个调用触发了为 start_model_update 方法生成的代理函数。
    • 该函数会读取 Worker 类中对应方法上的 @register 装饰器信息。例如,load_states 方法的装饰器是 @register(dispatch_mode=Dispatch.ONE_TO_ALL)
    • 代理函数根据 dispatch_mode 知道应该如何分发这个调用。ONE_TO_ALL 意味着需要把这个调用发给所有Worker
  3. 分发和远程执行 (Cluster.execute_all_async):

    • 代理函数内部会调用像 self.execute_all_async("start_model_update", *args, **kwargs) 这样的辅助函数。
    • execute_all_async 遍历它持有的 Worker 句柄列表 (self.workers)。
    • 对于列表中的每一个 worker,它发起一个远程调用:worker.start_model_update.remote(*args, **kwargs)
    • 这个 .remote() 调用是异步的。它会立即返回一个 ray.ObjectRef(可以理解为一个“提货单”或“future”),而不会等待远程任务完成。
    • execute_all_async 最终返回一个由这些“提货单”组成的列表。
  4. 结果收集 (ray.get):

    • ModelUpdateGroup.model_update 中的代码展示了最后一步:
      # ray.get 会阻塞程序,直到所有远程任务完成
      dataprotos: list[DataProto] = ray.get(...)
      
    • ray.get() 接收这个“提货单”列表,然后阻塞程序的执行,直到所有远程 Worker 的任务都执行完毕。
    • 一旦所有 Worker 都返回了结果,ray.get() 就会将所有结果收集起来,返回一个包含每个 Worker 返回值的真实列表(例如,一个由 DataProto 对象组成的列表)。
    • 最后,可以对这些来自不同 Worker 的结果进行聚合,例如 reduce_metrics_list

流程总结图

渲染错误: Mermaid 渲染失败: Trying to inactivate an inactive participant (ray)

总而言之,这个架构将**编排(Cluster执行(Worker)**完美地分离开来,利用Ray强大的底层分布式能力,实现了一个清晰、可扩展且功能强大的分布式计算框架。

cluster 中 port干嘛用的,runtime_env = RuntimeEnv(env_vars=env_vars), 什么时创建多个worker,worker 中的port 又是干嘛的?

Cluster中Port和Worker创建机制详解

Cluster中的Port用途

Cluster中的master_port主要用于分布式通信协调。在Cluster._create_workers()中,rank为0的worker会获取主节点地址和端口,并共享给所有其他worker 1

if rank == 0:
    self.master_addr, self.master_port = ray.get(worker.get_master_addr_and_port.remote())

这个端口是PyTorch分布式进程组通信的关键组件,用于worker间的参数同步和数据交换。

RuntimeEnv的作用

RuntimeEnv(env_vars=env_vars)是Ray的运行时环境配置,用于为每个worker设置必要的环境变量 2

runtime_env = RuntimeEnv(env_vars=env_vars)
worker_options = {
    "runtime_env": runtime_env,
    # ... 其他配置
}

环境变量包括:

  • WORLD_SIZE:总worker数量
  • RANK:当前worker的rank
  • MASTER_ADDR/MASTER_PORT:主节点通信地址
  • CUDA_VISIBLE_DEVICES:可见GPU设备

多Worker创建时机

多个worker的创建由world_size参数决定,该参数通过以下公式计算 3

world_size = len(device_mapping) // num_gpus_per_worker

Cluster._create_workers()中,系统会为每个rank创建一个worker实例 4

Worker中的Port用途

Worker中的port主要用于分布式通信初始化。在Worker.__init__()中,rank为0的worker会:

  1. 获取本机IP地址作为主节点地址 5
  2. 分配一个空闲端口作为通信端口
  3. 将地址和端口存储到共享存储中供其他worker使用
if self.rank == 0:
    master_addr = self.get_node_ip()
    master_port = str(self.get_free_port())
    os.environ["MASTER_ADDR"] = master_addr
    os.environ["MASTER_PORT"] = master_port

get_free_port()方法确保端口唯一性,避免冲突 6

通信流程

SharedStorage Worker(rank>0) Worker(rank=0) Cluster SharedStorage Worker(rank>0) Worker(rank=0) Cluster 创建worker实例 获取node_ip和free_port 存储MASTER_ADDR/PORT 创建worker实例 获取MASTER_ADDR/PORT 设置分布式通信参数

Notes

  • Cluster中的port是分布式通信的枢纽,确保所有worker能够建立通信连接
  • RuntimeEnv是Ray提供的环境隔离机制,保证每个worker获得正确的运行环境
  • Worker数量由资源配置自动计算,支持灵活的GPU分配策略
  • 端口分配机制确保多进程环境下的通信唯一性

Wiki pages you might want to explore:

Citations

File: roll/distributed/executor/cluster.py (L100-145)

        for rank, pgs in enumerate(placement_groups):
            deploy_pg = pgs[0]
            pg_zero_gpu_ranks = sorted([pg["gpu_rank"] for pg in pgs if pg["node_rank"] == deploy_pg["node_rank"]])
            worker_name = f"{self.cluster_name}-{rank}"
            env_vars = {
                "WORLD_SIZE": str(self.world_size),
                "RANK": str(rank),
                "LOCAL_RANK": str(0),
                "CLUSTER_NAME": self.cluster_name,
                "WORKER_NAME": worker_name,
            }

            if rank != 0:
                env_vars["MASTER_ADDR"] = self.master_addr
                env_vars["MASTER_PORT"] = str(self.master_port)
            if deploy_pg["gpu_rank"] is not None:
                current_platform.update_env_vars_for_visible_devices(env_vars=env_vars, gpu_ranks=pg_zero_gpu_ranks)
            if "ROLL_LOG_DIR" in os.environ:
                env_vars["ROLL_LOG_DIR"] = os.environ["ROLL_LOG_DIR"]
            env_vars.update(self.worker_config.system_envs)

            runtime_env = RuntimeEnv(env_vars=env_vars)
            self.worker_config.resource_placement_groups = pgs

            worker_options = {
                "scheduling_strategy": PlacementGroupSchedulingStrategy(placement_group=deploy_pg["placement_group"]),
                "name": worker_name,
                "namespace": RAY_NAMESPACE,
                "runtime_env": runtime_env,
                "num_cpus": 0.01,
            }

            if current_platform.ray_device_key == "GPU":
                worker_options.update({"num_gpus": 0.01 if self.worker_config.device_mapping else 0})
            elif current_platform.ray_device_key == "NPU":
                worker_options.update(
                    {
                        "num_gpus": 0,
                        "resources": {
                            current_platform.ray_device_key: 0.01 if self.worker_config.device_mapping else 0
                        },
                    }
                )

            worker = self.worker_cls.options(**worker_options).remote(worker_config=self.worker_config)
            self.workers.append(worker)

File: roll/distributed/executor/cluster.py (L146-148)

            if rank == 0:
                self.master_addr, self.master_port = ray.get(worker.get_master_addr_and_port.remote())

File: docs_roll/docs/User Guides/Configuration/device_mapping.md (L80-84)

The number of workers (`world_size`) is automatically calculated based on the `device_mapping` and `num_gpus_per_worker` parameters:

```python
world_size = len(device_mapping) // num_gpus_per_worker

**File:** roll/distributed/executor/worker.py (L59-63)
```python
        if self.rank == 0:
            master_addr = self.get_node_ip()
            master_port = str(self.get_free_port())
            os.environ["MASTER_ADDR"] = master_addr
            os.environ["MASTER_PORT"] = master_port

File: roll/distributed/executor/worker.py (L95-113)

    @staticmethod
    def get_free_port():
        shared_storage = SharedStorage.options(
            name=STORAGE_NAME, get_if_exists=True, namespace=RAY_NAMESPACE
        ).remote()
        master_addr = Worker.get_node_ip()
        max_retry_count = int(os.environ.get("MAX_PORT_RETRY_COUNT", 1000))
        retry_count = 0
        master_port = collect_free_port()
        while retry_count < max_retry_count:
            master_addr_port_key = f"MASTER_ADDR_PORT:{master_addr}:{master_port}"
            if ray.get(shared_storage.get.remote(master_addr_port_key)) is None:
                ray.get(shared_storage.put.remote(master_addr_port_key, True))
                break
            master_port = collect_free_port()
            retry_count += 1
        if retry_count >= max_retry_count:
            raise RuntimeError(f"Can not allocate unique MASTER_PORT on {master_addr}.")
        return master_port
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐