import os

import ray
from ray.util.placement_group import PlacementGroup
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

from slime.ray.utils import NOSET_VISIBLE_DEVICES_ENV_VARS_LIST


class RayTrainGroup:
    """
    A group of ray actors
    Functions start with 'async' should return list of object refs

    Args:
        args (Namespace): Arguments for the actor group.
        num_nodes (int): Number of nodes for this actor group.
        num_gpus_per_node (int): Number of gpus for this actor group.
        pg (PlacementGroup, optional): Placement group to schedule actor on.
            If none, create new placement group automatically. Defaults to None.
        num_gpus_per_actor (float, optional): Number of gpus allocated for each actor.
            If < 1.0, multiple models can share same gpu. Defaults to 1.
        resources (Dict[str, float], optional): Custom resources to allocate for each actor.
            See https://docs.ray.io/en/latest/ray-core/scheduling/resources.html
        num_resources_per_node (int, optional): Number of custom resources to allocate for each node.
            See https://docs.ray.io/en/latest/ray-core/scheduling/resources.html
    """

    def __init__(
        self,
        args,
        num_nodes,
        num_gpus_per_node,
        pg: tuple[PlacementGroup, list[int]],
        num_gpus_per_actor: float = 1,
        role: str = "actor",
    ) -> None:
        self.args = args
        self._num_nodes = num_nodes
        self._num_gpus_per_node = num_gpus_per_node
        self.role = role

        # Allocate the GPUs for actors w/o instantiating them
        self._allocate_gpus_for_actor(pg, num_gpus_per_actor)

    def _allocate_gpus_for_actor(self, pg, num_gpus_per_actor):
        world_size = self._num_nodes * self._num_gpus_per_node

        # Use placement group to lock resources for models of same type
        assert pg is not None
        pg, reordered_bundle_indices = pg

        env_vars = {
            # because sglang will always set NCCL_CUMEM_ENABLE to 0
            # we need also set it to 0 to prevent nccl error.
            "NCCL_CUMEM_ENABLE": os.environ.get("NCCL_CUMEM_ENABLE", "0"),
            "NVTE_FP8_BLOCK_SCALING_FP32_SCALES": "1",
            **{name: "1" for name in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST},
            **self.args.train_env_vars,
        }

        if self.args.offload_train and self.args.train_backend == "megatron":
            import torch_memory_saver

            dynlib_path = os.path.join(
                os.path.dirname(os.path.dirname(torch_memory_saver.__file__)),
                "torch_memory_saver_hook_mode_preload.abi3.so",
            )
            assert os.path.exists(dynlib_path), f"LD_PRELOAD so file {dynlib_path} does not exist."

            env_vars["LD_PRELOAD"] = dynlib_path
            env_vars["TMS_INIT_ENABLE"] = "1"
            env_vars["TMS_INIT_ENABLE_CPU_BACKUP"] = "1"

        if self.args.use_routing_replay:
            env_vars["ENABLE_ROUTING_REPLAY"] = "1"

        backend = self.args.train_backend
        if backend == "megatron":
            from slime.backends.megatron_utils import MegatronTrainRayActor

            actor_impl = MegatronTrainRayActor

        else:
            from slime.backends.fsdp_utils import FSDPTrainRayActor

            actor_impl = FSDPTrainRayActor

        TrainRayActor = ray.remote(num_gpus=1, runtime_env={"env_vars": env_vars})(actor_impl)

        # Create worker actors
        self._actor_handlers = []
        master_addr, master_port = None, None
        for rank in range(world_size):
            actor = TrainRayActor.options(
                num_cpus=num_gpus_per_actor,
                num_gpus=num_gpus_per_actor,
                scheduling_strategy=PlacementGroupSchedulingStrategy(
                    placement_group=pg,
                    placement_group_bundle_index=reordered_bundle_indices[rank],
                ),
            ).remote(world_size, rank, master_addr, master_port)
            if rank == 0:
                master_addr, master_port = ray.get(actor.get_master_addr_and_port.remote())
            self._actor_handlers.append(actor)

    def async_init(self, args, role, with_ref=False):
        """
        Allocate GPU resourced and initialize model, optimzier, local ckpt, etc.
        """
        self.args = args
        return [actor.init.remote(args, role, with_ref=with_ref) for actor in self._actor_handlers]

    def async_train(self, rollout_id, rollout_data_ref):
        """Do one rollout training"""
        return [actor.train.remote(rollout_id, rollout_data_ref) for actor in self._actor_handlers]

    def save_model(self, step_id):
        """Save actor model on rank 0."""
        return ray.get([actor.save_model.remote(step_id) for actor in self._actor_handlers])

    def update_weights(self):
        """Broadcast weights from rank 0 to all other ranks."""
        return ray.get([actor.update_weights.remote() for actor in self._actor_handlers])

    def onload(self):
        return ray.get([actor.wake_up.remote() for actor in self._actor_handlers])

    def offload(self):
        return ray.get([actor.sleep.remote() for actor in self._actor_handlers])

    def clear_memory(self):
        return ray.get([actor.clear_memory.remote() for actor in self._actor_handlers])

    def connect(self, critic_group):
        return ray.get(
            [
                actor.connect_actor_critic.remote(critic)
                for actor, critic in zip(self._actor_handlers, critic_group._actor_handlers, strict=False)
            ]
        )

    def set_rollout_manager(self, rollout_manager):
        return ray.get([actor.set_rollout_manager.remote(rollout_manager) for actor in self._actor_handlers])
详细解释这个类

好的,我们来详细解释一下 RayTrainGroup 这个类。

这是一个精心设计的 Python 类,旨在简化和管理基于 Ray 的大规模分布式模型训练任务,特别是针对强化学习(如 RLHF)场景。它将一群 Ray Actor(分布式工作进程)封装成一个逻辑单元,提供统一的接口来控制它们的初始化、训练、模型保存和资源管理。


1. 总体概述 (High-Level Overview)

RayTrainGroup 的核心作用是创建一个用于分布式训练的“计算集群”。想象一下,你需要用 8 块 GPU 来训练一个大模型。这个类会帮你:

  1. 预留资源:确保这 8 块 GPU 在物理上被锁定,不会被其他任务抢占。
  2. 创建进程:在这些 GPU 上分别启动 8 个独立的训练进程(Ray Actors)。
  3. 建立通信:让这 8 个进程能够互相通信,协同完成一个训练任务(例如,使用 PyTorch DDP、FSDP 或 Megatron-LM)。
  4. 提供统一指挥:你可以通过 RayTrainGroup 的一个实例,同时向这 8 个进程发送命令,例如“开始训练”、“保存模型”、“释放显存”等。

这个类特别适用于复杂的训练流程,比如 PPO 算法(在 RLHF 中常用),因为它需要一个"Actor 模型"组和一个"Critic 模型"组,这个类可以分别实例化来管理这两组进程。


2. 核心概念解析 (Core Concepts)

在深入代码之前,理解这几个 Ray 的核心概念至关重要:

  • Ray Actor: 一个有状态的、可远程调用的 Python 对象/进程。你可以把它看作一个运行在集群某个节点上的独立服务。每个 TrainRayActor 就是一个 Actor,负责在一块或多块 GPU 上进行训练。
  • ObjectRef (对象引用): 当你调用一个远程方法(如 actor.train.remote())时,它不会立即返回结果,而是返回一个 ObjectRef。这可以看作是一个“未来的凭证”或“指向未来结果的指针”。代码可以继续执行而无需等待结果。你可以稍后通过 ray.get(object_ref) 来获取实际结果。async 前缀的方法就是返回 ObjectRef 列表的。
  • PlacementGroup (放置组): 这是 Ray 的高级调度功能。它允许你原子性地预留一组资源(例如,8 个 GPU 和对应的 CPU)。只有当所有请求的资源都满足时,预留才会成功。这可以防止死锁(例如,你请求 8 个 GPU,但集群只有 7 个可用,你的任务就不会启动,而不是占着 7 个干等),并能保证 Actor 被调度到你想要的位置(例如,所有 Actor 都在同一台机器上以获得高速互联)。

3. 代码逐段详解

3.1. __init__ 构造函数
def __init__(
    self,
    args,
    num_nodes,
    num_gpus_per_node,
    pg: tuple[PlacementGroup, list[int]], # 注意这里的类型
    num_gpus_per_actor: float = 1,
    role: str = "actor",
) -> None:
    self.args = args
    self._num_nodes = num_nodes
    self._num_gpus_per_node = num_gpus_per_node
    self.role = role

    # Allocate the GPUs for actors w/o instantiating them
    self._allocate_gpus_for_actor(pg, num_gpus_per_actor)

这是类的入口。当你创建一个 RayTrainGroup 实例时:

  • 参数:

    • args: 一个包含所有配置选项的命名空间对象(通常来自 argparse),例如学习率、模型大小、后端类型等。
    • num_nodes, num_gpus_per_node: 定义了这个训练组需要多少节点和每个节点上的 GPU 数量。总的训练进程数(world_size)就是这两者的乘积。
    • pg: tuple[PlacementGroup, list[int]]: 这是一个关键参数。它不是直接接收一个 PlacementGroup,而是一个元组。
      • PlacementGroup: 预先创建好的放置组,已经锁定了所需的资源。这个类不负责创建 PG,而是使用外部传入的 PG,这使得资源管理更加灵活,可以由更高层的逻辑统一协调多个 RayTrainGroup 的资源。
      • list[int]: 这是一个重新排序过的“资源包索引”(bundle indices)。PG 将资源划分为多个 bundle(例如,每个 bundle 是 {'GPU': 1, 'CPU': 8})。这个列表决定了第 rank 个 Actor 应该使用哪个 bundle。这对于确保 rank=0 的 Actor 落在特定的节点或 GPU 上非常有用。
    • num_gpus_per_actor: 每个 Actor 分配多少 GPU。通常是 1。如果是小数(如 0.5),则允许多个 Actor 共享同一块 GPU。
    • role: 字符串,如 “actor” 或 “critic”,用于区分不同的训练组,方便在初始化时加载不同的模型或配置。
  • 核心逻辑: 构造函数的核心工作是调用 _allocate_gpus_for_actor 方法,这个方法是实际创建 Actor 的地方。

3.2. _allocate_gpus_for_actor 方法

这个私有方法是整个类的核心,负责创建和配置所有的训练 Actor。

def _allocate_gpus_for_actor(self, pg, num_gpus_per_actor):
    world_size = self._num_nodes * self._num_gpus_per_node
    pg, reordered_bundle_indices = pg # 解包元组

    # 1. 设置环境变量 (env_vars)
    env_vars = { ... }
  • 环境变量设置:
    • NCCL_CUMEM_ENABLE: 与 NVIDIA 的 NCCL 通信库相关,为了兼容性(如注释中提到的 sglang)而设置。
    • NVTE_FP8_BLOCK_SCALING_FP32_SCALES: 启用 NVIDIA TransformerEngine 的 FP8 训练特性,表明系统设计用于最新的硬件(如 Hopper 架构 GPU)。
    • NOSET_VISIBLE_DEVICES_ENV_VARS_LIST: 自定义的环境变量,可能是为了防止某些库(如 transformers)自动设置 CUDA_VISIBLE_DEVICES,从而与 Ray 的 GPU 分配机制产生冲突。Ray 会自己处理这个环境变量。
    • LD_PRELOADTMS_*: 这是一个非常高级的内存优化技术
      • LD_PRELOAD 是一个 Linux 机制,它允许你在程序启动前加载一个动态链接库 (.so 文件)。
      • 这里的 torch_memory_saver_hook_mode_preload.abi3.so 库会劫持标准的内存分配函数(如 cudaMalloc)。当 PyTorch 尝试在 GPU 上分配内存时,这个库可以决定将某些张量(Tensor)自动换出(offload)到 CPU 内存或磁盘,从而节省宝贵的 GPU 显存。这对于训练超出单个 GPU 显存容量的大模型至关重要。
    • ENABLE_ROUTING_REPLAY: 一个自定义开关,可能与 Mixture-of-Experts (MoE) 模型的路由优化相关。
    # 2. 根据后端选择 Actor 实现
    backend = self.args.train_backend
    if backend == "megatron":
        from slime.backends.megatron_utils import MegatronTrainRayActor
        actor_impl = MegatronTrainRayActor
    else:
        from slime.backends.fsdp_utils import FSDPTrainRayActor
        actor_impl = FSDPTrainRayActor
  • 后端选择: 这是一个工厂模式。根据 args.train_backend 的值,它会选择不同的 Actor 实现。这使得 RayTrainGroup 可以灵活地支持多种分布式训练框架:
    • megatron: 使用 NVIDIA 的 Megatron-LM 框架,适用于超大规模模型的张量并行、流水线并行和数据并行。
    • fsdp: 使用 PyTorch 原生的 Fully Sharded Data Parallel (FSDP),这是另一种流行的大模型训练方案。
    # 3. 定义 Ray Actor 类
    TrainRayActor = ray.remote(num_gpus=1, runtime_env={"env_vars": env_vars})(actor_impl)
  • Actor 定义:
    • ray.remote(...): 这是将一个普通 Python 类 (actor_impl) 转换为 Ray Actor 的装饰器。
    • runtime_env={"env_vars": env_vars}: 这是向 Actor 注入环境变量的标准且正确的方式。Ray 会确保在启动 Actor 进程之前,这些环境变量被设置好。
    # 4. 循环创建 Actor 实例
    self._actor_handlers = []
    master_addr, master_port = None, None
    for rank in range(world_size):
        actor = TrainRayActor.options(
            num_cpus=num_gpus_per_actor, # 这里 num_cpus 的值可能是一个简化或约定
            num_gpus=num_gpus_per_actor,
            scheduling_strategy=PlacementGroupSchedulingStrategy(
                placement_group=pg,
                placement_group_bundle_index=reordered_bundle_indices[rank],
            ),
        ).remote(world_size, rank, master_addr, master_port)
        if rank == 0:
            master_addr, master_port = ray.get(actor.get_master_addr_and_port.remote())
        self._actor_handlers.append(actor)
  • Actor 创建循环: 这是最精彩的部分。
    • 分布式通信设置: master_addr, master_port 是分布式训练的“接头暗号”。rank=0 的进程是主节点(master),它会先启动,获取自己的 IP 地址和端口。然后,其他所有进程 (rank > 0) 在启动时都会被告知主节点的地址和端口,这样它们就知道去哪里集合,从而建立 torch.distributed 的通信组。
    • .options(...): 这是在实例化一个 Actor 时指定其特定资源和调度策略的方法。
    • scheduling_strategy: 这里明确告诉 Ray:“把这个 Actor 调度到 pg 这个放置组中,并且使用索引为 reordered_bundle_indices[rank] 的那个资源包”。这保证了 Actor 会精确地使用我们之前预留的 GPU,并且顺序是确定的。
    • .remote(...): 这会异步地创建 Actor 实例,并调用其 __init__ 方法。
    • self._actor_handlers: 将创建的 Actor 的句柄(handler)保存起来,以便之后通过这些句柄向 Actor 发送指令。
3.3. 公共方法 (Public Methods)

这些方法提供了对整个 Actor 群体的统一控制接口。

  • async_init(self, args, role, with_ref=False)

    • 作用: 并行地在所有 Actor 上调用 init.remote() 方法,进行模型、优化器等组件的初始化。
    • 异步: 方法名以 async 开头,它返回一个 ObjectRef 列表,调用者需要使用 ray.get() 来等待所有 Actor 初始化完成。
  • async_train(self, rollout_id, rollout_data_ref)

    • 作用: 执行一轮训练。它将 rollout_data_ref (一个指向 Ray 对象存储中训练数据的引用) 发送给所有 Actor。使用引用 (_ref) 可以避免数据的重复拷贝,非常高效。
    • 异步: 同样返回 ObjectRef 列表。
  • save_model(self, step_id), update_weights(self), onload(self), offload(self), clear_memory(self)

    • 作用: 这些都是管理性操作。
      • save_model: 保存模型(通常由 rank 0 执行)。
      • update_weights: 将 rank 0 的权重广播给其他所有 rank。
      • onload/offload: 对应 Actor 的 wake_up/sleepoffload 将模型从 GPU 移到 CPU,释放显存给其他任务(例如,一个推理服务)。onload 则将其加载回 GPU。这是实现 GPU 资源分时复用的关键。
      • clear_memory: 清理不必要的缓存。
    • 同步: 注意这些方法内部调用了 ray.get()。这意味着当你调用 group.save_model() 时,程序会阻塞,直到所有 Actor 都完成了 save_model.remote() 操作。这为调用者提供了便利。
  • connect(self, critic_group)

    • 作用: 这是为 RLHF 的 Actor-Critic 架构设计的。它将当前 RayTrainGroup(例如,Actor 模型组)的每个 Actor 与 critic_group(Critic 模型组)中对应的 Actor 连接起来。
    • zip(...): zip 操作确保了 rank=i 的 Actor 模型与 rank=i 的 Critic 模型建立连接,这对于 PPO 算法中计算优势函数(advantage)是必需的。
  • set_rollout_manager(self, rollout_manager)

    • 作用: 将一个 rollout_manager 对象(可能也是一个 Ray Actor)的句柄设置给所有的训练 Actor。这可能是为了让训练 Actor 能够从数据生成端(rollout workers)拉取新的训练数据。

4. 工作流程和设计模式总结

  1. 资源预留与调度分离: RayTrainGroup 依赖外部传入的 PlacementGroup,体现了良好的分层设计。上层逻辑负责资源规划,本类负责使用资源执行任务。
  2. 抽象与封装: 它将复杂的分布式环境设置(环境变量、通信)、Actor 创建和调度逻辑完美地封装起来,向用户暴露了简洁的 train, save, init 接口。
  3. 灵活性与可扩展性: 通过后端选择机制,可以轻松切换 Megatron-LMFSDP,未来也可以方便地支持其他训练框架。
  4. 高效的内存管理: 通过 LD_PRELOAD 实现的自动 CPU offload 和手动的 onload/offload 接口,展示了在极限资源条件下训练大模型的先进技术。
  5. 为 RLHF 优化: connect 方法和 role 参数表明这个类是为复杂的 Actor-Critic 训练范式量身定制的。

总之,RayTrainGroup 是一个强大而优雅的工具,它是一个“分布式训练集群”的控制器,极大地降低了在 Ray 上运行大规模、多组件(Actor-Critic)训练任务的复杂度。

您好!您截取的这段代码片段 TrainRayActor = ray.remote(...)(actor_impl)Ray Core 中定义一个 Actor 的经典模式。它将一个普通的 Python 类(这里是 actor_impl,我们假设它就是 MegatronTrainRayActor)转换成一个可以被 Ray 远程实例化和调用的分布式对象。

让我们来详细拆解这个语法的每一个部分:


语法拆解: ray.remote(...)(ClassName)

这实际上是一个两步操作:

  1. ray.remote(...): 这是一个函数调用,它返回一个装饰器

    • 括号里的参数,如 num_gpus=1, runtime_env={...},是 Actor 的选项 (Actor Options)。这些选项告诉 Ray 在创建这个 Actor 的实例时需要满足哪些资源和环境要求。
  2. (...)(ClassName): 上一步返回的装饰器被立即应用在 ClassName(在您的例子中是 actor_impl)上。

    • 这个装饰器会处理 ClassName,生成一个新的类,我们通常称之为 ActorClass。这个新的类包含了所有远程调用的能力。

最终,变量 TrainRayActor 被赋值为这个新生成的 ActorClass

所以,这行代码的功能等同于标准的装饰器语法:

@ray.remote(num_gpus=1, runtime_env={"env_vars": env_vars})
class actor_impl:
    # ... MegatronTrainRayActor 的所有代码 ...

# 此时,actor_impl 已经被转换成了一个 ActorClass
TrainRayActor = actor_impl 

或者,如果 actor_impl 已经在别处定义好了,那么 TrainRayActor = ray.remote(...)(actor_impl) 就是将一个普通的类“注册”成一个可以被 Ray 使用的 Actor 类型。


参数详解

现在我们来看您代码中的具体参数:

  1. num_gpus=1:

    • 含义: 这告诉 Ray 调度器,每创建一个 TrainRayActor 的实例,都必须为它预留 1 个 GPU
    • 作用: 这是 Ray 进行资源管理和调度的核心。Ray 会在集群中寻找一个拥有至少 1 个空闲 GPU 的节点来放置这个 Actor。一旦 Actor 被创建,这个 GPU 就会被该 Actor 独占,直到 Actor 被销毁。
  2. runtime_env={"env_vars": env_vars}:

    • 含义: 这是 Ray 的 运行时环境 (Runtime Environment) 配置。它允许你为 Actor 指定一个隔离的、可复现的运行环境。
    • "env_vars": env_vars: 这部分是说,当 Ray 启动这个 Actor 的进程时,需要设置一些特定的环境变量
    • env_vars: 这应该是一个 Python 字典,包含了你想要设置的环境变量键值对。例如:
      env_vars = {
          "CUDA_VISIBLE_DEVICES": "0", # (Ray 会自动处理这个,这里只是举例)
          "NCCL_DEBUG": "INFO",
          "HTTP_PROXY": "http://your.proxy.server:port"
      }
      
    • 作用: 这非常重要!它确保了 Actor 在一个配置正确的环境中运行。例如,分布式训练需要的 NCCL 相关环境变量、代理设置、或者特定库需要的路径都可以在这里配置,而不会污染主进程或其他 Actor 的环境。

如何使用 TrainRayActor

在执行了 TrainRayActor = ray.remote(...)(actor_impl) 之后,你就可以像下面这样来创建和使用这个 Actor 了:

# 1. 创建 Actor 的实例
#    Ray 会在集群中找到一个有 GPU 的节点来运行它。
#    .remote() 是 Actor 构造函数的远程调用版本。
my_actor_instance = TrainRayActor.remote(
    args=some_args, 
    role="actor", 
    with_ref=True
)

# 2. 远程调用 Actor 的方法
#    .remote() 也是方法调用的远程版本。
#    这个调用是非阻塞的,它会立即返回一个 ObjectRef (一个未来的凭证)。
start_id_ref = my_actor_instance.init.remote(
    args=some_args, 
    role="actor", 
    with_ref=True
)

# 3. 获取结果
#    ray.get() 会阻塞,直到远程方法执行完毕并返回结果。
start_rollout_id = ray.get(start_id_ref)
print(f"Actor initialized. Start rollout ID is: {start_rollout_id}")

# 4. 再次调用其他方法
my_actor_instance.train.remote(rollout_id=0, rollout_data_ref=some_data)

总结

TrainRayActor = ray.remote(num_gpus=1, runtime_env={"env_vars": env_vars})(actor_impl) 这行代码是一个声明和定义

它向 Ray 系统声明:

“嘿,Ray!我这里有一个叫 actor_impl 的 Python 类。请你把它包装成一个叫 TrainRayActor 的分布式 Actor 类型。以后每当我要创建一个 TrainRayActor 实例时,请务必给它分配 1 个 GPU,并且在启动它的进程时,把我在 env_vars 字典里定义的环境变量都给它设置好。”

这是一个将普通 Python 代码无缝转换为分布式组件的强大而简洁的语法,是 Ray 框架的核心魅力之一。

RayTrainGroup、TrainRayActor 和 MegatronTrainRayActor 之间是层级继承和管理关系: 1 2 3

关系概述

  • RayTrainGroup 是 Ray Actor 的组管理器,负责创建和管理多个训练 Actor 实例 4
  • TrainRayActor 是抽象基类,定义了训练 Actor 的通用接口 5
  • MegatronTrainRayActor 继承自 TrainRayActor,是 Megatron-LM 后端的具体实现 3

详细关系

1. 继承关系

TrainRayActor
MegatronTrainRayActor
FSDPTrainRayActor

TrainRayActor 作为抽象基类,定义了 inittrainsave_modelupdate_weights 等抽象方法 5 。MegatronTrainRayActor 实现了这些方法,专门用于 Megatron-LM 分布式训练 6

2. 管理关系

RayTrainGroup 根据后端类型动态选择 Actor 实现:

  • train_backend == "megatron" 时,使用 MegatronTrainRayActor 7
  • 否则使用 FSDPTrainRayActor 8

RayTrainGroup 创建多个 Actor 实例并管理它们的生命周期 9

3. 初始化流程

RayTrainGroup TrainRayActor MegatronTrainRayActor 创建远程 Actor 实例 async_init(args, role, with_ref) super().init(args, role, with_ref) Megatron 特定初始化 返回 start_rollout_id RayTrainGroup TrainRayActor MegatronTrainRayActor

Notes

除了 MegatronTrainRayActor,还有 FSDPTrainRayActor 作为另一个后端实现 10 。RayTrainGroup 通过配置参数动态选择使用哪个具体实现。

Wiki pages you might want to explore:

Citations

File: slime/ray/actor_group.py (L10-27)

class RayTrainGroup:
    """
    A group of ray actors
    Functions start with 'async' should return list of object refs

    Args:
        args (Namespace): Arguments for the actor group.
        num_nodes (int): Number of nodes for this actor group.
        num_gpus_per_node (int): Number of gpus for this actor group.
        pg (PlacementGroup, optional): Placement group to schedule actor on.
            If none, create new placement group automatically. Defaults to None.
        num_gpus_per_actor (float, optional): Number of gpus allocated for each actor.
            If < 1.0, multiple models can share same gpu. Defaults to 1.
        resources (Dict[str, float], optional): Custom resources to allocate for each actor.
            See https://docs.ray.io/en/latest/ray-core/scheduling/resources.html
        num_resources_per_node (int, optional): Number of custom resources to allocate for each node.
            See https://docs.ray.io/en/latest/ray-core/scheduling/resources.html
    """

File: slime/ray/actor_group.py (L78-82)

        backend = self.args.train_backend
        if backend == "megatron":
            from slime.backends.megatron_utils import MegatronTrainRayActor

            actor_impl = MegatronTrainRayActor

File: slime/ray/actor_group.py (L84-87)

        else:
            from slime.backends.fsdp_utils import FSDPTrainRayActor

            actor_impl = FSDPTrainRayActor

File: slime/ray/actor_group.py (L92-106)

        self._actor_handlers = []
        master_addr, master_port = None, None
        for rank in range(world_size):
            actor = TrainRayActor.options(
                num_cpus=num_gpus_per_actor,
                num_gpus=num_gpus_per_actor,
                scheduling_strategy=PlacementGroupSchedulingStrategy(
                    placement_group=pg,
                    placement_group_bundle_index=reordered_bundle_indices[rank],
                ),
            ).remote(world_size, rank, master_addr, master_port)
            if rank == 0:
                master_addr, master_port = ray.get(actor.get_master_addr_and_port.remote())
            self._actor_handlers.append(actor)

File: slime/ray/actor_group.py (L107-112)

    def async_init(self, args, role, with_ref=False):
        """
        Allocate GPU resourced and initialize model, optimzier, local ckpt, etc.
        """
        self.args = args
        return [actor.init.remote(args, role, with_ref=with_ref) for actor in self._actor_handlers]

File: slime/ray/train_actor.py (L28-44)

class TrainRayActor(RayActor):
    def __init__(self, world_size, rank, master_addr, master_port):
        configure_logger()

        self._world_size = world_size
        self._rank = rank
        if master_addr:
            self.master_addr, self.master_port = master_addr, master_port
        else:
            self.master_addr, self.master_port = self._get_current_node_ip_and_free_port(
                start_port=random.randint(20000, 21000)
            )

        os.environ["MASTER_ADDR"] = self.master_addr
        os.environ["MASTER_PORT"] = str(self.master_port)
        os.environ["WORLD_SIZE"] = str(self._world_size)
        os.environ["RANK"] = str(self._rank)

File: slime/ray/train_actor.py (L103-129)

    @abc.abstractmethod
    def sleep(self, tags):
        raise NotImplementedError

    @abc.abstractmethod
    def wake_up(self, tags):
        raise NotImplementedError

    @abc.abstractmethod
    def train(self, rollout_id, rollout_data_ref):
        raise NotImplementedError

    @abc.abstractmethod
    def save_model(self, rollout_id, force_sync=False):
        raise NotImplementedError

    @abc.abstractmethod
    def update_weights(self):
        raise NotImplementedError

    @abc.abstractmethod
    def connect_actor_critic(self, critic_group):
        raise NotImplementedError

    @abc.abstractmethod
    def _get_parallel_config(self):
        raise NotImplementedError

File: slime/backends/megatron_utils/actor.py (L46-46)

class MegatronTrainRayActor(TrainRayActor):

File: slime/backends/megatron_utils/actor.py (L48-157)

    def init(
        self,
        args: Namespace,
        role: str,
        with_ref: bool = False,
    ) -> int | None:
        monkey_patch_torch_dist()

        super().init(args, role, with_ref)

        init(args)

        if is_megatron_main_rank():
            init_tracking(args, primary=False)

        self.prof = TrainProfiler(args)

        # read config and tokenizer serialized to prevent concurrent writing bug.
        for i in range(dist.get_world_size()):
            if i == dist.get_rank():
                self.hf_config = AutoConfig.from_pretrained(args.hf_checkpoint, trust_remote_code=True)
                self.tokenizer = AutoTokenizer.from_pretrained(self.args.hf_checkpoint, trust_remote_code=True)
                if args.megatron_to_hf_mode == "bridge":
                    args.bridge = AutoBridge.from_hf_pretrained(args.hf_checkpoint, trust_remote_code=True)

            dist.barrier(group=get_gloo_group())

        self.train_parallel_config = {
            "dp_size": mpu.get_data_parallel_world_size(with_context_parallel=False),
        }
        dist.barrier(group=get_gloo_group())

        if args.offload_train:
            if (x := args.train_memory_margin_bytes) > 0:
                logger.info(f"Set torch_memory_saver.memory_margin_bytes to {x}")
                torch_memory_saver.memory_margin_bytes = x

        if self.args.debug_rollout_only:
            return 0

        if role == "critic":
            self.args.load = self.args.critic_load
            self.args.save = self.args.critic_save
            self.args.lr = self.args.critic_lr
            self.args.lr_warmup_iters = self.args.critic_lr_warmup_iters

        (self.model, self.optimizer, self.opt_param_scheduler, loaded_rollout_id) = initialize_model_and_optimizer(
            args, role
        )

        if role == "critic":
            if self.args.offload_train:
                self.sleep()
            return

        start_rollout_id = loaded_rollout_id + 1

        self.weights_backuper = TensorBackuper.create(
            source_getter=lambda: named_params_and_buffers(
                self.args,
                self.model,
                convert_to_global_name=args.megatron_to_hf_mode == "raw",
                translate_gpu_to_cpu=not self.args.enable_weights_backuper,
            ),
            single_tag=None if args.enable_weights_backuper else "actor",
        )
        self._active_model_tag: str | None = "actor"
        self.weights_backuper.backup("actor")

        if with_ref:
            self.load_other_checkpoint("ref", args.ref_load)

        if self.args.keep_old_actor:
            # Load old_actor checkpoint
            self.load_other_checkpoint("old_actor", args.load)
            # Create rollout_actor as a copy of current actor
            if args.update_weights_interval == 1:
                self.weights_backuper.backup("rollout_actor")

        if self.args.vocab_size is None:
            self.args.vocab_size = self.tokenizer.vocab_size

        update_weight_cls = UpdateWeightFromTensor if self.args.colocate else UpdateWeightFromDistributed
        self.weight_updater = update_weight_cls(
            self.args,
            self.model,
            weights_getter=lambda: self.weights_backuper.get("actor"),
            model_name=type(self.hf_config).__name__.lower() if self.args.model_name is None else self.args.model_name,
            quantization_config=getattr(self.hf_config, "quantization_config", None),
        )

        # empty cache after initialization
        clear_memory()

        if self.args.offload_train:
            # recover to actor in the end.
            self._switch_model("actor")
            self.sleep()

        self.rollout_engines = None

        self.rollout_data_postprocess = None
        if self.args.rollout_data_postprocess_path is not None:
            from slime.utils.misc import load_function

            self.rollout_data_postprocess = load_function(self.args.rollout_data_postprocess_path)

        self.prof.on_init_end()

        return start_rollout_id

File: slime/backends/fsdp_utils/actor.py (L38-49)

class FSDPTrainRayActor(TrainRayActor):
    """Simplified TrainRayActor for pure HF+FSDP training.

    Responsibilities:
      * Initialize model/tokenizer on rank0 sequentially to avoid race on cache
      * Wrap model with FSDP
      * Provide minimal train / save / update_weights hooks compatible with existing RayTrainGroup

    Weight update strategy:
      * Rank0 gathers state_dict (full) and broadcasts tensor-by-tensor.
      * For small models this is fine; for larger models consider sharded state_dict type.
    """
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐