【RL】SLIME 集群管理
的核心作用是创建一个用于分布式训练的“计算集群”。想象一下,你需要用 8 块 GPU 来训练一个大模型。预留资源:确保这 8 块 GPU 在物理上被锁定,不会被其他任务抢占。创建进程:在这些 GPU 上分别启动 8 个独立的训练进程(Ray Actors)。建立通信:让这 8 个进程能够互相通信,协同完成一个训练任务(例如,使用 PyTorch DDP、FSDP 或 Megatron-LM)。提供
import os
import ray
from ray.util.placement_group import PlacementGroup
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from slime.ray.utils import NOSET_VISIBLE_DEVICES_ENV_VARS_LIST
class RayTrainGroup:
"""
A group of ray actors
Functions start with 'async' should return list of object refs
Args:
args (Namespace): Arguments for the actor group.
num_nodes (int): Number of nodes for this actor group.
num_gpus_per_node (int): Number of gpus for this actor group.
pg (PlacementGroup, optional): Placement group to schedule actor on.
If none, create new placement group automatically. Defaults to None.
num_gpus_per_actor (float, optional): Number of gpus allocated for each actor.
If < 1.0, multiple models can share same gpu. Defaults to 1.
resources (Dict[str, float], optional): Custom resources to allocate for each actor.
See https://docs.ray.io/en/latest/ray-core/scheduling/resources.html
num_resources_per_node (int, optional): Number of custom resources to allocate for each node.
See https://docs.ray.io/en/latest/ray-core/scheduling/resources.html
"""
def __init__(
self,
args,
num_nodes,
num_gpus_per_node,
pg: tuple[PlacementGroup, list[int]],
num_gpus_per_actor: float = 1,
role: str = "actor",
) -> None:
self.args = args
self._num_nodes = num_nodes
self._num_gpus_per_node = num_gpus_per_node
self.role = role
# Allocate the GPUs for actors w/o instantiating them
self._allocate_gpus_for_actor(pg, num_gpus_per_actor)
def _allocate_gpus_for_actor(self, pg, num_gpus_per_actor):
world_size = self._num_nodes * self._num_gpus_per_node
# Use placement group to lock resources for models of same type
assert pg is not None
pg, reordered_bundle_indices = pg
env_vars = {
# because sglang will always set NCCL_CUMEM_ENABLE to 0
# we need also set it to 0 to prevent nccl error.
"NCCL_CUMEM_ENABLE": os.environ.get("NCCL_CUMEM_ENABLE", "0"),
"NVTE_FP8_BLOCK_SCALING_FP32_SCALES": "1",
**{name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST},
**self.args.train_env_vars,
}
if self.args.offload_train and self.args.train_backend == "megatron":
import torch_memory_saver
dynlib_path = os.path.join(
os.path.dirname(os.path.dirname(torch_memory_saver.__file__)),
"torch_memory_saver_hook_mode_preload.abi3.so",
)
assert os.path.exists(dynlib_path), f"LD_PRELOAD so file {dynlib_path} does not exist."
env_vars["LD_PRELOAD"] = dynlib_path
env_vars["TMS_INIT_ENABLE"] = "1"
env_vars["TMS_INIT_ENABLE_CPU_BACKUP"] = "1"
if self.args.use_routing_replay:
env_vars["ENABLE_ROUTING_REPLAY"] = "1"
backend = self.args.train_backend
if backend == "megatron":
from slime.backends.megatron_utils import MegatronTrainRayActor
actor_impl = MegatronTrainRayActor
else:
from slime.backends.fsdp_utils import FSDPTrainRayActor
actor_impl = FSDPTrainRayActor
TrainRayActor = ray.remote(num_gpus=1, runtime_env={"env_vars": env_vars})(actor_impl)
# Create worker actors
self._actor_handlers = []
master_addr, master_port = None, None
for rank in range(world_size):
actor = TrainRayActor.options(
num_cpus=num_gpus_per_actor,
num_gpus=num_gpus_per_actor,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=reordered_bundle_indices[rank],
),
).remote(world_size, rank, master_addr, master_port)
if rank == 0:
master_addr, master_port = ray.get(actor.get_master_addr_and_port.remote())
self._actor_handlers.append(actor)
def async_init(self, args, role, with_ref=False):
"""
Allocate GPU resourced and initialize model, optimzier, local ckpt, etc.
"""
self.args = args
return [actor.init.remote(args, role, with_ref=with_ref) for actor in self._actor_handlers]
def async_train(self, rollout_id, rollout_data_ref):
"""Do one rollout training"""
return [actor.train.remote(rollout_id, rollout_data_ref) for actor in self._actor_handlers]
def save_model(self, step_id):
"""Save actor model on rank 0."""
return ray.get([actor.save_model.remote(step_id) for actor in self._actor_handlers])
def update_weights(self):
"""Broadcast weights from rank 0 to all other ranks."""
return ray.get([actor.update_weights.remote() for actor in self._actor_handlers])
def onload(self):
return ray.get([actor.wake_up.remote() for actor in self._actor_handlers])
def offload(self):
return ray.get([actor.sleep.remote() for actor in self._actor_handlers])
def clear_memory(self):
return ray.get([actor.clear_memory.remote() for actor in self._actor_handlers])
def connect(self, critic_group):
return ray.get(
[
actor.connect_actor_critic.remote(critic)
for actor, critic in zip(self._actor_handlers, critic_group._actor_handlers, strict=False)
]
)
def set_rollout_manager(self, rollout_manager):
return ray.get([actor.set_rollout_manager.remote(rollout_manager) for actor in self._actor_handlers])
详细解释这个类
好的,我们来详细解释一下 RayTrainGroup 这个类。
这是一个精心设计的 Python 类,旨在简化和管理基于 Ray 的大规模分布式模型训练任务,特别是针对强化学习(如 RLHF)场景。它将一群 Ray Actor(分布式工作进程)封装成一个逻辑单元,提供统一的接口来控制它们的初始化、训练、模型保存和资源管理。
1. 总体概述 (High-Level Overview)
RayTrainGroup 的核心作用是创建一个用于分布式训练的“计算集群”。想象一下,你需要用 8 块 GPU 来训练一个大模型。这个类会帮你:
- 预留资源:确保这 8 块 GPU 在物理上被锁定,不会被其他任务抢占。
- 创建进程:在这些 GPU 上分别启动 8 个独立的训练进程(Ray Actors)。
- 建立通信:让这 8 个进程能够互相通信,协同完成一个训练任务(例如,使用 PyTorch DDP、FSDP 或 Megatron-LM)。
- 提供统一指挥:你可以通过
RayTrainGroup的一个实例,同时向这 8 个进程发送命令,例如“开始训练”、“保存模型”、“释放显存”等。
这个类特别适用于复杂的训练流程,比如 PPO 算法(在 RLHF 中常用),因为它需要一个"Actor 模型"组和一个"Critic 模型"组,这个类可以分别实例化来管理这两组进程。
2. 核心概念解析 (Core Concepts)
在深入代码之前,理解这几个 Ray 的核心概念至关重要:
- Ray Actor: 一个有状态的、可远程调用的 Python 对象/进程。你可以把它看作一个运行在集群某个节点上的独立服务。每个
TrainRayActor就是一个 Actor,负责在一块或多块 GPU 上进行训练。 - ObjectRef (对象引用): 当你调用一个远程方法(如
actor.train.remote())时,它不会立即返回结果,而是返回一个ObjectRef。这可以看作是一个“未来的凭证”或“指向未来结果的指针”。代码可以继续执行而无需等待结果。你可以稍后通过ray.get(object_ref)来获取实际结果。async前缀的方法就是返回ObjectRef列表的。 - PlacementGroup (放置组): 这是 Ray 的高级调度功能。它允许你原子性地预留一组资源(例如,8 个 GPU 和对应的 CPU)。只有当所有请求的资源都满足时,预留才会成功。这可以防止死锁(例如,你请求 8 个 GPU,但集群只有 7 个可用,你的任务就不会启动,而不是占着 7 个干等),并能保证 Actor 被调度到你想要的位置(例如,所有 Actor 都在同一台机器上以获得高速互联)。
3. 代码逐段详解
3.1. __init__ 构造函数
def __init__(
self,
args,
num_nodes,
num_gpus_per_node,
pg: tuple[PlacementGroup, list[int]], # 注意这里的类型
num_gpus_per_actor: float = 1,
role: str = "actor",
) -> None:
self.args = args
self._num_nodes = num_nodes
self._num_gpus_per_node = num_gpus_per_node
self.role = role
# Allocate the GPUs for actors w/o instantiating them
self._allocate_gpus_for_actor(pg, num_gpus_per_actor)
这是类的入口。当你创建一个 RayTrainGroup 实例时:
-
参数:
args: 一个包含所有配置选项的命名空间对象(通常来自argparse),例如学习率、模型大小、后端类型等。num_nodes,num_gpus_per_node: 定义了这个训练组需要多少节点和每个节点上的 GPU 数量。总的训练进程数(world_size)就是这两者的乘积。pg: tuple[PlacementGroup, list[int]]: 这是一个关键参数。它不是直接接收一个PlacementGroup,而是一个元组。PlacementGroup: 预先创建好的放置组,已经锁定了所需的资源。这个类不负责创建 PG,而是使用外部传入的 PG,这使得资源管理更加灵活,可以由更高层的逻辑统一协调多个RayTrainGroup的资源。list[int]: 这是一个重新排序过的“资源包索引”(bundle indices)。PG 将资源划分为多个 bundle(例如,每个 bundle 是{'GPU': 1, 'CPU': 8})。这个列表决定了第rank个 Actor 应该使用哪个 bundle。这对于确保rank=0的 Actor 落在特定的节点或 GPU 上非常有用。
num_gpus_per_actor: 每个 Actor 分配多少 GPU。通常是 1。如果是小数(如 0.5),则允许多个 Actor 共享同一块 GPU。role: 字符串,如 “actor” 或 “critic”,用于区分不同的训练组,方便在初始化时加载不同的模型或配置。
-
核心逻辑: 构造函数的核心工作是调用
_allocate_gpus_for_actor方法,这个方法是实际创建 Actor 的地方。
3.2. _allocate_gpus_for_actor 方法
这个私有方法是整个类的核心,负责创建和配置所有的训练 Actor。
def _allocate_gpus_for_actor(self, pg, num_gpus_per_actor):
world_size = self._num_nodes * self._num_gpus_per_node
pg, reordered_bundle_indices = pg # 解包元组
# 1. 设置环境变量 (env_vars)
env_vars = { ... }
- 环境变量设置:
NCCL_CUMEM_ENABLE: 与 NVIDIA 的 NCCL 通信库相关,为了兼容性(如注释中提到的sglang)而设置。NVTE_FP8_BLOCK_SCALING_FP32_SCALES: 启用 NVIDIA TransformerEngine 的 FP8 训练特性,表明系统设计用于最新的硬件(如 Hopper 架构 GPU)。NOSET_VISIBLE_DEVICES_ENV_VARS_LIST: 自定义的环境变量,可能是为了防止某些库(如transformers)自动设置CUDA_VISIBLE_DEVICES,从而与 Ray 的 GPU 分配机制产生冲突。Ray 会自己处理这个环境变量。LD_PRELOAD和TMS_*: 这是一个非常高级的内存优化技术。LD_PRELOAD是一个 Linux 机制,它允许你在程序启动前加载一个动态链接库 (.so文件)。- 这里的
torch_memory_saver_hook_mode_preload.abi3.so库会劫持标准的内存分配函数(如cudaMalloc)。当 PyTorch 尝试在 GPU 上分配内存时,这个库可以决定将某些张量(Tensor)自动换出(offload)到 CPU 内存或磁盘,从而节省宝贵的 GPU 显存。这对于训练超出单个 GPU 显存容量的大模型至关重要。
ENABLE_ROUTING_REPLAY: 一个自定义开关,可能与 Mixture-of-Experts (MoE) 模型的路由优化相关。
# 2. 根据后端选择 Actor 实现
backend = self.args.train_backend
if backend == "megatron":
from slime.backends.megatron_utils import MegatronTrainRayActor
actor_impl = MegatronTrainRayActor
else:
from slime.backends.fsdp_utils import FSDPTrainRayActor
actor_impl = FSDPTrainRayActor
- 后端选择: 这是一个工厂模式。根据
args.train_backend的值,它会选择不同的 Actor 实现。这使得RayTrainGroup可以灵活地支持多种分布式训练框架:megatron: 使用 NVIDIA 的 Megatron-LM 框架,适用于超大规模模型的张量并行、流水线并行和数据并行。fsdp: 使用 PyTorch 原生的 Fully Sharded Data Parallel (FSDP),这是另一种流行的大模型训练方案。
# 3. 定义 Ray Actor 类
TrainRayActor = ray.remote(num_gpus=1, runtime_env={"env_vars": env_vars})(actor_impl)
- Actor 定义:
ray.remote(...): 这是将一个普通 Python 类 (actor_impl) 转换为 Ray Actor 的装饰器。runtime_env={"env_vars": env_vars}: 这是向 Actor 注入环境变量的标准且正确的方式。Ray 会确保在启动 Actor 进程之前,这些环境变量被设置好。
# 4. 循环创建 Actor 实例
self._actor_handlers = []
master_addr, master_port = None, None
for rank in range(world_size):
actor = TrainRayActor.options(
num_cpus=num_gpus_per_actor, # 这里 num_cpus 的值可能是一个简化或约定
num_gpus=num_gpus_per_actor,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=reordered_bundle_indices[rank],
),
).remote(world_size, rank, master_addr, master_port)
if rank == 0:
master_addr, master_port = ray.get(actor.get_master_addr_and_port.remote())
self._actor_handlers.append(actor)
- Actor 创建循环: 这是最精彩的部分。
- 分布式通信设置:
master_addr, master_port是分布式训练的“接头暗号”。rank=0的进程是主节点(master),它会先启动,获取自己的 IP 地址和端口。然后,其他所有进程 (rank > 0) 在启动时都会被告知主节点的地址和端口,这样它们就知道去哪里集合,从而建立torch.distributed的通信组。 .options(...): 这是在实例化一个 Actor 时指定其特定资源和调度策略的方法。scheduling_strategy: 这里明确告诉 Ray:“把这个 Actor 调度到pg这个放置组中,并且使用索引为reordered_bundle_indices[rank]的那个资源包”。这保证了 Actor 会精确地使用我们之前预留的 GPU,并且顺序是确定的。.remote(...): 这会异步地创建 Actor 实例,并调用其__init__方法。self._actor_handlers: 将创建的 Actor 的句柄(handler)保存起来,以便之后通过这些句柄向 Actor 发送指令。
- 分布式通信设置:
3.3. 公共方法 (Public Methods)
这些方法提供了对整个 Actor 群体的统一控制接口。
-
async_init(self, args, role, with_ref=False)- 作用: 并行地在所有 Actor 上调用
init.remote()方法,进行模型、优化器等组件的初始化。 - 异步: 方法名以
async开头,它返回一个ObjectRef列表,调用者需要使用ray.get()来等待所有 Actor 初始化完成。
- 作用: 并行地在所有 Actor 上调用
-
async_train(self, rollout_id, rollout_data_ref)- 作用: 执行一轮训练。它将
rollout_data_ref(一个指向 Ray 对象存储中训练数据的引用) 发送给所有 Actor。使用引用 (_ref) 可以避免数据的重复拷贝,非常高效。 - 异步: 同样返回
ObjectRef列表。
- 作用: 执行一轮训练。它将
-
save_model(self, step_id),update_weights(self),onload(self),offload(self),clear_memory(self)- 作用: 这些都是管理性操作。
save_model: 保存模型(通常由 rank 0 执行)。update_weights: 将 rank 0 的权重广播给其他所有 rank。onload/offload: 对应 Actor 的wake_up/sleep。offload将模型从 GPU 移到 CPU,释放显存给其他任务(例如,一个推理服务)。onload则将其加载回 GPU。这是实现 GPU 资源分时复用的关键。clear_memory: 清理不必要的缓存。
- 同步: 注意这些方法内部调用了
ray.get()。这意味着当你调用group.save_model()时,程序会阻塞,直到所有 Actor 都完成了save_model.remote()操作。这为调用者提供了便利。
- 作用: 这些都是管理性操作。
-
connect(self, critic_group)- 作用: 这是为 RLHF 的 Actor-Critic 架构设计的。它将当前
RayTrainGroup(例如,Actor 模型组)的每个 Actor 与critic_group(Critic 模型组)中对应的 Actor 连接起来。 zip(...):zip操作确保了rank=i的 Actor 模型与rank=i的 Critic 模型建立连接,这对于 PPO 算法中计算优势函数(advantage)是必需的。
- 作用: 这是为 RLHF 的 Actor-Critic 架构设计的。它将当前
-
set_rollout_manager(self, rollout_manager)- 作用: 将一个
rollout_manager对象(可能也是一个 Ray Actor)的句柄设置给所有的训练 Actor。这可能是为了让训练 Actor 能够从数据生成端(rollout workers)拉取新的训练数据。
- 作用: 将一个
4. 工作流程和设计模式总结
- 资源预留与调度分离:
RayTrainGroup依赖外部传入的PlacementGroup,体现了良好的分层设计。上层逻辑负责资源规划,本类负责使用资源执行任务。 - 抽象与封装: 它将复杂的分布式环境设置(环境变量、通信)、Actor 创建和调度逻辑完美地封装起来,向用户暴露了简洁的
train,save,init接口。 - 灵活性与可扩展性: 通过后端选择机制,可以轻松切换
Megatron-LM和FSDP,未来也可以方便地支持其他训练框架。 - 高效的内存管理: 通过
LD_PRELOAD实现的自动 CPU offload 和手动的onload/offload接口,展示了在极限资源条件下训练大模型的先进技术。 - 为 RLHF 优化:
connect方法和role参数表明这个类是为复杂的 Actor-Critic 训练范式量身定制的。
总之,RayTrainGroup 是一个强大而优雅的工具,它是一个“分布式训练集群”的控制器,极大地降低了在 Ray 上运行大规模、多组件(Actor-Critic)训练任务的复杂度。
您好!您截取的这段代码片段 TrainRayActor = ray.remote(...)(actor_impl) 是 Ray Core 中定义一个 Actor 的经典模式。它将一个普通的 Python 类(这里是 actor_impl,我们假设它就是 MegatronTrainRayActor)转换成一个可以被 Ray 远程实例化和调用的分布式对象。
让我们来详细拆解这个语法的每一个部分:
语法拆解: ray.remote(...)(ClassName)
这实际上是一个两步操作:
-
ray.remote(...): 这是一个函数调用,它返回一个装饰器。- 括号里的参数,如
num_gpus=1,runtime_env={...},是 Actor 的选项 (Actor Options)。这些选项告诉 Ray 在创建这个 Actor 的实例时需要满足哪些资源和环境要求。
- 括号里的参数,如
-
(...)(ClassName): 上一步返回的装饰器被立即应用在ClassName(在您的例子中是actor_impl)上。- 这个装饰器会处理
ClassName,生成一个新的类,我们通常称之为 ActorClass。这个新的类包含了所有远程调用的能力。
- 这个装饰器会处理
最终,变量 TrainRayActor 被赋值为这个新生成的 ActorClass。
所以,这行代码的功能等同于标准的装饰器语法:
@ray.remote(num_gpus=1, runtime_env={"env_vars": env_vars})
class actor_impl:
# ... MegatronTrainRayActor 的所有代码 ...
# 此时,actor_impl 已经被转换成了一个 ActorClass
TrainRayActor = actor_impl
或者,如果 actor_impl 已经在别处定义好了,那么 TrainRayActor = ray.remote(...)(actor_impl) 就是将一个普通的类“注册”成一个可以被 Ray 使用的 Actor 类型。
参数详解
现在我们来看您代码中的具体参数:
-
num_gpus=1:- 含义: 这告诉 Ray 调度器,每创建一个
TrainRayActor的实例,都必须为它预留 1 个 GPU。 - 作用: 这是 Ray 进行资源管理和调度的核心。Ray 会在集群中寻找一个拥有至少 1 个空闲 GPU 的节点来放置这个 Actor。一旦 Actor 被创建,这个 GPU 就会被该 Actor 独占,直到 Actor 被销毁。
- 含义: 这告诉 Ray 调度器,每创建一个
-
runtime_env={"env_vars": env_vars}:- 含义: 这是 Ray 的 运行时环境 (Runtime Environment) 配置。它允许你为 Actor 指定一个隔离的、可复现的运行环境。
"env_vars": env_vars: 这部分是说,当 Ray 启动这个 Actor 的进程时,需要设置一些特定的环境变量。env_vars: 这应该是一个 Python 字典,包含了你想要设置的环境变量键值对。例如:env_vars = { "CUDA_VISIBLE_DEVICES": "0", # (Ray 会自动处理这个,这里只是举例) "NCCL_DEBUG": "INFO", "HTTP_PROXY": "http://your.proxy.server:port" }- 作用: 这非常重要!它确保了 Actor 在一个配置正确的环境中运行。例如,分布式训练需要的
NCCL相关环境变量、代理设置、或者特定库需要的路径都可以在这里配置,而不会污染主进程或其他 Actor 的环境。
如何使用 TrainRayActor
在执行了 TrainRayActor = ray.remote(...)(actor_impl) 之后,你就可以像下面这样来创建和使用这个 Actor 了:
# 1. 创建 Actor 的实例
# Ray 会在集群中找到一个有 GPU 的节点来运行它。
# .remote() 是 Actor 构造函数的远程调用版本。
my_actor_instance = TrainRayActor.remote(
args=some_args,
role="actor",
with_ref=True
)
# 2. 远程调用 Actor 的方法
# .remote() 也是方法调用的远程版本。
# 这个调用是非阻塞的,它会立即返回一个 ObjectRef (一个未来的凭证)。
start_id_ref = my_actor_instance.init.remote(
args=some_args,
role="actor",
with_ref=True
)
# 3. 获取结果
# ray.get() 会阻塞,直到远程方法执行完毕并返回结果。
start_rollout_id = ray.get(start_id_ref)
print(f"Actor initialized. Start rollout ID is: {start_rollout_id}")
# 4. 再次调用其他方法
my_actor_instance.train.remote(rollout_id=0, rollout_data_ref=some_data)
总结
TrainRayActor = ray.remote(num_gpus=1, runtime_env={"env_vars": env_vars})(actor_impl) 这行代码是一个声明和定义。
它向 Ray 系统声明:
“嘿,Ray!我这里有一个叫
actor_impl的 Python 类。请你把它包装成一个叫TrainRayActor的分布式 Actor 类型。以后每当我要创建一个TrainRayActor实例时,请务必给它分配 1 个 GPU,并且在启动它的进程时,把我在env_vars字典里定义的环境变量都给它设置好。”
这是一个将普通 Python 代码无缝转换为分布式组件的强大而简洁的语法,是 Ray 框架的核心魅力之一。
RayTrainGroup、TrainRayActor 和 MegatronTrainRayActor 之间是层级继承和管理关系: 1 2 3
关系概述
- RayTrainGroup 是 Ray Actor 的组管理器,负责创建和管理多个训练 Actor 实例 4
- TrainRayActor 是抽象基类,定义了训练 Actor 的通用接口 5
- MegatronTrainRayActor 继承自 TrainRayActor,是 Megatron-LM 后端的具体实现 3
详细关系
1. 继承关系
TrainRayActor 作为抽象基类,定义了 init、train、save_model、update_weights 等抽象方法 5 。MegatronTrainRayActor 实现了这些方法,专门用于 Megatron-LM 分布式训练 6 。
2. 管理关系
RayTrainGroup 根据后端类型动态选择 Actor 实现:
RayTrainGroup 创建多个 Actor 实例并管理它们的生命周期 9 。
3. 初始化流程
Notes
除了 MegatronTrainRayActor,还有 FSDPTrainRayActor 作为另一个后端实现 10 。RayTrainGroup 通过配置参数动态选择使用哪个具体实现。
Wiki pages you might want to explore:
Citations
File: slime/ray/actor_group.py (L10-27)
class RayTrainGroup:
"""
A group of ray actors
Functions start with 'async' should return list of object refs
Args:
args (Namespace): Arguments for the actor group.
num_nodes (int): Number of nodes for this actor group.
num_gpus_per_node (int): Number of gpus for this actor group.
pg (PlacementGroup, optional): Placement group to schedule actor on.
If none, create new placement group automatically. Defaults to None.
num_gpus_per_actor (float, optional): Number of gpus allocated for each actor.
If < 1.0, multiple models can share same gpu. Defaults to 1.
resources (Dict[str, float], optional): Custom resources to allocate for each actor.
See https://docs.ray.io/en/latest/ray-core/scheduling/resources.html
num_resources_per_node (int, optional): Number of custom resources to allocate for each node.
See https://docs.ray.io/en/latest/ray-core/scheduling/resources.html
"""
File: slime/ray/actor_group.py (L78-82)
backend = self.args.train_backend
if backend == "megatron":
from slime.backends.megatron_utils import MegatronTrainRayActor
actor_impl = MegatronTrainRayActor
File: slime/ray/actor_group.py (L84-87)
else:
from slime.backends.fsdp_utils import FSDPTrainRayActor
actor_impl = FSDPTrainRayActor
File: slime/ray/actor_group.py (L92-106)
self._actor_handlers = []
master_addr, master_port = None, None
for rank in range(world_size):
actor = TrainRayActor.options(
num_cpus=num_gpus_per_actor,
num_gpus=num_gpus_per_actor,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=reordered_bundle_indices[rank],
),
).remote(world_size, rank, master_addr, master_port)
if rank == 0:
master_addr, master_port = ray.get(actor.get_master_addr_and_port.remote())
self._actor_handlers.append(actor)
File: slime/ray/actor_group.py (L107-112)
def async_init(self, args, role, with_ref=False):
"""
Allocate GPU resourced and initialize model, optimzier, local ckpt, etc.
"""
self.args = args
return [actor.init.remote(args, role, with_ref=with_ref) for actor in self._actor_handlers]
File: slime/ray/train_actor.py (L28-44)
class TrainRayActor(RayActor):
def __init__(self, world_size, rank, master_addr, master_port):
configure_logger()
self._world_size = world_size
self._rank = rank
if master_addr:
self.master_addr, self.master_port = master_addr, master_port
else:
self.master_addr, self.master_port = self._get_current_node_ip_and_free_port(
start_port=random.randint(20000, 21000)
)
os.environ["MASTER_ADDR"] = self.master_addr
os.environ["MASTER_PORT"] = str(self.master_port)
os.environ["WORLD_SIZE"] = str(self._world_size)
os.environ["RANK"] = str(self._rank)
File: slime/ray/train_actor.py (L103-129)
@abc.abstractmethod
def sleep(self, tags):
raise NotImplementedError
@abc.abstractmethod
def wake_up(self, tags):
raise NotImplementedError
@abc.abstractmethod
def train(self, rollout_id, rollout_data_ref):
raise NotImplementedError
@abc.abstractmethod
def save_model(self, rollout_id, force_sync=False):
raise NotImplementedError
@abc.abstractmethod
def update_weights(self):
raise NotImplementedError
@abc.abstractmethod
def connect_actor_critic(self, critic_group):
raise NotImplementedError
@abc.abstractmethod
def _get_parallel_config(self):
raise NotImplementedError
File: slime/backends/megatron_utils/actor.py (L46-46)
class MegatronTrainRayActor(TrainRayActor):
File: slime/backends/megatron_utils/actor.py (L48-157)
def init(
self,
args: Namespace,
role: str,
with_ref: bool = False,
) -> int | None:
monkey_patch_torch_dist()
super().init(args, role, with_ref)
init(args)
if is_megatron_main_rank():
init_tracking(args, primary=False)
self.prof = TrainProfiler(args)
# read config and tokenizer serialized to prevent concurrent writing bug.
for i in range(dist.get_world_size()):
if i == dist.get_rank():
self.hf_config = AutoConfig.from_pretrained(args.hf_checkpoint, trust_remote_code=True)
self.tokenizer = AutoTokenizer.from_pretrained(self.args.hf_checkpoint, trust_remote_code=True)
if args.megatron_to_hf_mode == "bridge":
args.bridge = AutoBridge.from_hf_pretrained(args.hf_checkpoint, trust_remote_code=True)
dist.barrier(group=get_gloo_group())
self.train_parallel_config = {
"dp_size": mpu.get_data_parallel_world_size(with_context_parallel=False),
}
dist.barrier(group=get_gloo_group())
if args.offload_train:
if (x := args.train_memory_margin_bytes) > 0:
logger.info(f"Set torch_memory_saver.memory_margin_bytes to {x}")
torch_memory_saver.memory_margin_bytes = x
if self.args.debug_rollout_only:
return 0
if role == "critic":
self.args.load = self.args.critic_load
self.args.save = self.args.critic_save
self.args.lr = self.args.critic_lr
self.args.lr_warmup_iters = self.args.critic_lr_warmup_iters
(self.model, self.optimizer, self.opt_param_scheduler, loaded_rollout_id) = initialize_model_and_optimizer(
args, role
)
if role == "critic":
if self.args.offload_train:
self.sleep()
return
start_rollout_id = loaded_rollout_id + 1
self.weights_backuper = TensorBackuper.create(
source_getter=lambda: named_params_and_buffers(
self.args,
self.model,
convert_to_global_name=args.megatron_to_hf_mode == "raw",
translate_gpu_to_cpu=not self.args.enable_weights_backuper,
),
single_tag=None if args.enable_weights_backuper else "actor",
)
self._active_model_tag: str | None = "actor"
self.weights_backuper.backup("actor")
if with_ref:
self.load_other_checkpoint("ref", args.ref_load)
if self.args.keep_old_actor:
# Load old_actor checkpoint
self.load_other_checkpoint("old_actor", args.load)
# Create rollout_actor as a copy of current actor
if args.update_weights_interval == 1:
self.weights_backuper.backup("rollout_actor")
if self.args.vocab_size is None:
self.args.vocab_size = self.tokenizer.vocab_size
update_weight_cls = UpdateWeightFromTensor if self.args.colocate else UpdateWeightFromDistributed
self.weight_updater = update_weight_cls(
self.args,
self.model,
weights_getter=lambda: self.weights_backuper.get("actor"),
model_name=type(self.hf_config).__name__.lower() if self.args.model_name is None else self.args.model_name,
quantization_config=getattr(self.hf_config, "quantization_config", None),
)
# empty cache after initialization
clear_memory()
if self.args.offload_train:
# recover to actor in the end.
self._switch_model("actor")
self.sleep()
self.rollout_engines = None
self.rollout_data_postprocess = None
if self.args.rollout_data_postprocess_path is not None:
from slime.utils.misc import load_function
self.rollout_data_postprocess = load_function(self.args.rollout_data_postprocess_path)
self.prof.on_init_end()
return start_rollout_id
File: slime/backends/fsdp_utils/actor.py (L38-49)
class FSDPTrainRayActor(TrainRayActor):
"""Simplified TrainRayActor for pure HF+FSDP training.
Responsibilities:
* Initialize model/tokenizer on rank0 sequentially to avoid race on cache
* Wrap model with FSDP
* Provide minimal train / save / update_weights hooks compatible with existing RayTrainGroup
Weight update strategy:
* Rank0 gathers state_dict (full) and broadcasts tensor-by-tensor.
* For small models this is fine; for larger models consider sharded state_dict type.
"""
更多推荐



所有评论(0)