【RL】ROLL cluster 内worker 通信方式
Mermaid 渲染失败: Trying to inactivate an inactive participant (ray)总而言之,这个架构将**编排(Cluster和执行(Worker)**完美地分离开来,利用Ray强大的底层分布式能力,实现了一个清晰、可扩展且功能强大的分布式计算框架。cluster 中 port干嘛用的,runtime_env = RuntimeEnv(env_vars
好的,我们来用中文详细解释一下这个执行流程。
这个架构是分布式计算中非常经典的 “控制器-工作者”(Controller-Worker)模式 的一种高效实现。
为了方便理解,我们可以使用一个生动的比喻:指挥家和交响乐团。
核心关系:Cluster vs. Worker
-
Worker(演奏家):Worker是一个独立的、实际执行计算任务的单元。它是一个ray.Actor,意味着它是一个有自己状态的独立进程。它持有模型的一部分,与指定的GPU绑定,并负责执行具体的计算,比如模型推理、训练、参数更新等。它知道自己的身份(比如它是乐团的第几号小提琴手),但通常不关心整个乐团的宏观调度。 -
Cluster(指挥家):Cluster是一个逻辑上的控制器,它管理着一组Worker(整个乐团)。它自己不执行任何繁重的计算(指挥家自己不拉小提琴),而是负责编排和指挥所有的Worker。用户的代码主要与Cluster交互。当你想让整个团队做某件事时,你向Cluster下达指令,它会把指令精准地分发给相应的Worker。
下面是它们角色的总结:
| 方面 | Worker (执行者) |
Cluster (协调者) |
|---|---|---|
| 本质 | 一个 ray.Actor,即一个独立的、有状态的进程。 |
一个普通的Python对象,但它持有对所有 Worker Actor的远程引用(ActorHandles)。 |
| 职责 | - 执行计算任务(如 model_update)。- 管理自身状态(模型权重、优化器等)。 - 与分配给它的硬件(GPU/NPU)交互。 |
- 创建、配置并将 Worker Actor放置到指定的硬件上。- 提供统一的API,向全部或部分 Worker分派命令。- 从 Worker 收集并聚合结果。 |
| 感知范围 | 只知道自己的 rank 和总 world_size。 |
了解所有 Worker,知道它们的 rank,并能作为一个整体与它们通信。 |
| 任务示例 | self.strategy.model_update(): 运行一个前向/反向传播。 |
my_cluster.start_model_update(): 告诉每一个Worker去执行它们各自的model_update方法。 |
详细执行流程(分步解析)
让我们追踪从创建到执行命令的完整生命周期。
第一步:Cluster 的初始化(组建乐团)
这是“指挥家”组建并安排“乐团”的过程。
-
用户代码: 你首先创建一个
Cluster实例。# 简化示例 train_cluster = Cluster( name="trainer", worker_cls=MyTrainingWorker, resource_manager=my_resource_manager, worker_config=my_worker_config ) -
资源分配 (
ResourceManager):Cluster的__init__方法会立即调用resource_manager.allocate_placement_group()。ResourceManager之前已经通过ray.util.placement_group在Ray集群上预留了资源。PlacementGroup是Ray的一个特性,它能保证一组资源(比如“某个节点上的8个GPU”)被整体预留,不会被其他任务抢占。这个过程就像在物理机房里“圈地”。allocate_placement_group将逻辑上的world_size(例如,需要16个Worker)映射到物理硬件上。它会返回每个Worker所需的资源规格列表,例如:# 对于 rank=0 的 Worker,它需要使用0号节点上的0号GPU [ [ {'node_rank': 0, 'gpu_rank': 0, 'placement_group': <PG_object_for_node0>} ], # Worker 0 的资源 [ {'node_rank': 0, 'gpu_rank': 1, 'placement_group': <PG_object_for_node0>} ], # Worker 1 的资源 ... ]
-
创建 Worker (
Cluster._create_workers):Cluster从rank = 0循环到world_size - 1。- 对于每一个
rank:- 它准备一套环境变量(
RANK,WORLD_SIZE,MASTER_ADDR,CUDA_VISIBLE_DEVICES等)。这是每个Worker进程启动后了解自己**“身份ID”**和如何加入分布式通信组(如torch.distributed)的方式。 - 它使用
self.worker_cls.options(...).remote(...)来创建WorkerActor。 - 最关键的一步是,
.options()调用中传入了scheduling_strategy,并指定了从ResourceManager获取的PlacementGroup。这精确地告诉Ray:“请把这个WorkerActor启动到我指定的那个节点的那块GPU上”。 remote()调用返回一个ActorHandle(对Worker的远程引用),这个引用被存放在self.workers列表中。
- 它准备一套环境变量(
-
方法绑定 (
Cluster._bind_worker_method):- 这是最“神奇”的部分,实现了一种动态代理。
Cluster检查worker_cls(Worker类)的所有方法,寻找那些被@register装饰器标记过的方法(例如Worker中的load_states)。- 对于每个找到的方法,它在
Cluster实例上动态创建一个同名的新方法(例如cluster.load_states)。 - 这个新方法是一个代理。当你调用它时,它并不会执行
Cluster自己的代码,而是执行预先配置好的分发逻辑,这个逻辑知道如何将这个调用请求转发给所有实际的WorkerActor。
初始化结束后,Cluster 对象就准备好了。它管理着一个运行在指定GPU上的 Worker Actor列表,并且拥有了一套可以直接调用的代理方法,随时准备向 Worker 们下达指令。
第二步:执行一个命令(开始演奏)
现在,我们看看当你调用 Cluster 上的一个方法时会发生什么。
-
用户代码: 你调用了上一步动态绑定的代理方法。
# 这个调用是在 Cluster 对象上发起的 results = train_cluster.start_model_update(...)这里以
ModelUpdateGroup中的调用为例。 -
代理方法被触发:
- 这个调用触发了为
start_model_update方法生成的代理函数。 - 该函数会读取
Worker类中对应方法上的@register装饰器信息。例如,load_states方法的装饰器是@register(dispatch_mode=Dispatch.ONE_TO_ALL)。 - 代理函数根据
dispatch_mode知道应该如何分发这个调用。ONE_TO_ALL意味着需要把这个调用发给所有的Worker。
- 这个调用触发了为
-
分发和远程执行 (
Cluster.execute_all_async):- 代理函数内部会调用像
self.execute_all_async("start_model_update", *args, **kwargs)这样的辅助函数。 execute_all_async遍历它持有的Worker句柄列表 (self.workers)。- 对于列表中的每一个
worker,它发起一个远程调用:worker.start_model_update.remote(*args, **kwargs)。 - 这个
.remote()调用是异步的。它会立即返回一个ray.ObjectRef(可以理解为一个“提货单”或“future”),而不会等待远程任务完成。 execute_all_async最终返回一个由这些“提货单”组成的列表。
- 代理函数内部会调用像
-
结果收集 (
ray.get):ModelUpdateGroup.model_update中的代码展示了最后一步:# ray.get 会阻塞程序,直到所有远程任务完成 dataprotos: list[DataProto] = ray.get(...)ray.get()接收这个“提货单”列表,然后阻塞程序的执行,直到所有远程Worker的任务都执行完毕。- 一旦所有
Worker都返回了结果,ray.get()就会将所有结果收集起来,返回一个包含每个Worker返回值的真实列表(例如,一个由DataProto对象组成的列表)。 - 最后,可以对这些来自不同
Worker的结果进行聚合,例如reduce_metrics_list。
流程总结图
总而言之,这个架构将**编排(Cluster)和执行(Worker)**完美地分离开来,利用Ray强大的底层分布式能力,实现了一个清晰、可扩展且功能强大的分布式计算框架。
cluster 中 port干嘛用的,runtime_env = RuntimeEnv(env_vars=env_vars), 什么时创建多个worker,worker 中的port 又是干嘛的?
Cluster中Port和Worker创建机制详解
Cluster中的Port用途
Cluster中的master_port主要用于分布式通信协调。在Cluster._create_workers()中,rank为0的worker会获取主节点地址和端口,并共享给所有其他worker 1 :
if rank == 0:
self.master_addr, self.master_port = ray.get(worker.get_master_addr_and_port.remote())
这个端口是PyTorch分布式进程组通信的关键组件,用于worker间的参数同步和数据交换。
RuntimeEnv的作用
RuntimeEnv(env_vars=env_vars)是Ray的运行时环境配置,用于为每个worker设置必要的环境变量 2 :
runtime_env = RuntimeEnv(env_vars=env_vars)
worker_options = {
"runtime_env": runtime_env,
# ... 其他配置
}
环境变量包括:
WORLD_SIZE:总worker数量RANK:当前worker的rankMASTER_ADDR/MASTER_PORT:主节点通信地址CUDA_VISIBLE_DEVICES:可见GPU设备
多Worker创建时机
多个worker的创建由world_size参数决定,该参数通过以下公式计算 3 :
world_size = len(device_mapping) // num_gpus_per_worker
在Cluster._create_workers()中,系统会为每个rank创建一个worker实例 4 。
Worker中的Port用途
Worker中的port主要用于分布式通信初始化。在Worker.__init__()中,rank为0的worker会:
- 获取本机IP地址作为主节点地址 5
- 分配一个空闲端口作为通信端口
- 将地址和端口存储到共享存储中供其他worker使用
if self.rank == 0:
master_addr = self.get_node_ip()
master_port = str(self.get_free_port())
os.environ["MASTER_ADDR"] = master_addr
os.environ["MASTER_PORT"] = master_port
get_free_port()方法确保端口唯一性,避免冲突 6 。
通信流程
Notes
- Cluster中的port是分布式通信的枢纽,确保所有worker能够建立通信连接
- RuntimeEnv是Ray提供的环境隔离机制,保证每个worker获得正确的运行环境
- Worker数量由资源配置自动计算,支持灵活的GPU分配策略
- 端口分配机制确保多进程环境下的通信唯一性
Wiki pages you might want to explore:
Citations
File: roll/distributed/executor/cluster.py (L100-145)
for rank, pgs in enumerate(placement_groups):
deploy_pg = pgs[0]
pg_zero_gpu_ranks = sorted([pg["gpu_rank"] for pg in pgs if pg["node_rank"] == deploy_pg["node_rank"]])
worker_name = f"{self.cluster_name}-{rank}"
env_vars = {
"WORLD_SIZE": str(self.world_size),
"RANK": str(rank),
"LOCAL_RANK": str(0),
"CLUSTER_NAME": self.cluster_name,
"WORKER_NAME": worker_name,
}
if rank != 0:
env_vars["MASTER_ADDR"] = self.master_addr
env_vars["MASTER_PORT"] = str(self.master_port)
if deploy_pg["gpu_rank"] is not None:
current_platform.update_env_vars_for_visible_devices(env_vars=env_vars, gpu_ranks=pg_zero_gpu_ranks)
if "ROLL_LOG_DIR" in os.environ:
env_vars["ROLL_LOG_DIR"] = os.environ["ROLL_LOG_DIR"]
env_vars.update(self.worker_config.system_envs)
runtime_env = RuntimeEnv(env_vars=env_vars)
self.worker_config.resource_placement_groups = pgs
worker_options = {
"scheduling_strategy": PlacementGroupSchedulingStrategy(placement_group=deploy_pg["placement_group"]),
"name": worker_name,
"namespace": RAY_NAMESPACE,
"runtime_env": runtime_env,
"num_cpus": 0.01,
}
if current_platform.ray_device_key == "GPU":
worker_options.update({"num_gpus": 0.01 if self.worker_config.device_mapping else 0})
elif current_platform.ray_device_key == "NPU":
worker_options.update(
{
"num_gpus": 0,
"resources": {
current_platform.ray_device_key: 0.01 if self.worker_config.device_mapping else 0
},
}
)
worker = self.worker_cls.options(**worker_options).remote(worker_config=self.worker_config)
self.workers.append(worker)
File: roll/distributed/executor/cluster.py (L146-148)
if rank == 0:
self.master_addr, self.master_port = ray.get(worker.get_master_addr_and_port.remote())
File: docs_roll/docs/User Guides/Configuration/device_mapping.md (L80-84)
The number of workers (`world_size`) is automatically calculated based on the `device_mapping` and `num_gpus_per_worker` parameters:
```python
world_size = len(device_mapping) // num_gpus_per_worker
**File:** roll/distributed/executor/worker.py (L59-63)
```python
if self.rank == 0:
master_addr = self.get_node_ip()
master_port = str(self.get_free_port())
os.environ["MASTER_ADDR"] = master_addr
os.environ["MASTER_PORT"] = master_port
File: roll/distributed/executor/worker.py (L95-113)
@staticmethod
def get_free_port():
shared_storage = SharedStorage.options(
name=STORAGE_NAME, get_if_exists=True, namespace=RAY_NAMESPACE
).remote()
master_addr = Worker.get_node_ip()
max_retry_count = int(os.environ.get("MAX_PORT_RETRY_COUNT", 1000))
retry_count = 0
master_port = collect_free_port()
while retry_count < max_retry_count:
master_addr_port_key = f"MASTER_ADDR_PORT:{master_addr}:{master_port}"
if ray.get(shared_storage.get.remote(master_addr_port_key)) is None:
ray.get(shared_storage.put.remote(master_addr_port_key, True))
break
master_port = collect_free_port()
retry_count += 1
if retry_count >= max_retry_count:
raise RuntimeError(f"Can not allocate unique MASTER_PORT on {master_addr}.")
return master_port
更多推荐
所有评论(0)