最近在看 verl 源码时,看到其主要使用了 Ray 实现资源调度和分布式训练的能力,然后看了下其他的强化学习框架,也有很多框架使用了 Ray,感觉 Ray 已经成为了现代强化学习领域实现分布式训练和资源调度的主流选择,所以还是有必要了解下它的一些核心概念和特性,这样看到相关代码时才能大概知道其作用。

Ray 是什么?

“An open source framework to build and scale your ML and Python applications easily.”

Ray 是由加州大学伯克利分校RISELabi开发的,RISELab是大数据和AI领域顶尖的实验室,Spark、vLLM 等也是出自这个实验室,并且这个实验室的成员还创建了Databricks(Spark创始人)、Anyscale(Ray初创成员) 这样的知名公司。

Ray 是一个开源的AI 计算引擎和统一框架,用于轻松构建和扩展机器学习与通用 Python 应用程序。它提供了一套统一的、高性能的分布式计算原语,支持从原型开发到生产部署的全生命周期。

Ray由一个核心分布式运行时(Ray Core)和一组用于简化机器学习计算的人工智能库组成:

核心组件

Ray 采用模块化设计,包含多个高层库,覆盖 ML 全流程:

组件 功能 关键能力
Ray Core 分布式 Python 基础 任务(Tasks)、参与者(Actors)、对象存储(Objects)——构建任意分布式应用
Ray Data 数据加载与预处理 高效、可扩展的数据摄取、转换、批处理(支持文本、图像、LLM 等)
Ray Train 分布式模型训练 支持 PyTorch、TensorFlow、XGBoost、Hugging Face 等主流框架
Ray Tune 任意规模的超参数调优 自动调参、实验管理、支持多种搜索算法(Optuna, HyperOpt 等)
Ray Serve 模型在线服务 低延迟、高并发的模型部署,支持 LLM、多模型组合、动态批处理
Ray RLlib 强化学习 工业级 RL 算法库,支持单智能体/多智能体、离线/在线训练

✅ 所有组件均基于 Ray Core 构建,共享同一套调度、容错、资源管理机制。

Ray Core

Ray Core 是 Ray 分布式计算框架的核心模块,它提供了一组极简但强大的原语(primitives),用于构建和扩展通用的 Python 分布式应用,能让你只需几行代码,就能将普通的 Python 或 Java 函数及类转换为分布式的无状态任务和有状态参与者。主要包含三个核心概念:

  1. Tasks(任务):远程执行的无状态函数。
  2. Actors(参与者):远程执行的有状态类实例。
  3. Objects(对象):存储在分布式对象存储中的数据。

使用 Task

Task 是在 Ray 集群上并行化你的 Python 函数的最简单方式。要创建和运行一个任务:

  1. 使用 @ray.remote 装饰函数,使其成为可远程执行的任务。
  2. 使用 .remote() 调用该函数,而不是进行常规的函数调用
  3. 使用用 ray.get() 从返回的 future 对象获取结果。
# Define the square task.
@ray.remote
def square(x):
    return x * x

# Launch four parallel square tasks.
futures = [square.remote(i) for i in range(4)]

# Retrieve results.
print(ray.get(futures))
# -> [0, 1, 4, 9]

✅ 优势:自动并行化、跨节点调度、容错。

使用 Actors

Task 是无状态的,而 Actor 则是有状态的,Actor 在方法调用之间会保持其内部状态。当你实例化一个Ray Actor 时:

  1. Ray 会在集群中的某个位置启动一个专用的工作进程
  2. Actor的方法在那个特定的工作进程上运行,并且能够访问和修改其状态
  3. Actor 按照接收方法调用的顺序串行执行这些调用,以保持一致性

要创建和实例化 Actor:

  1. @ray.remote 装饰类,创建远程 Actor 实例。
  2. 每个 Actor 在独立进程中运行,维护内部状态。
  3. 方法调用也使用 .remote(),按提交顺序串行执行。
# Define the Counter actor.
@ray.remote
class Counter:
    def __init__(self):
        self.i = 0

    def get(self):
        return self.i

    def incr(self, value):
        self.i += value

# Create a Counter actor.
c = Counter.remote()

# Submit calls to the actor. These calls run asynchronously but in
# submission order on the remote actor process.
for _ in range(10):
    c.incr.remote(1)

# Retrieve final actor state.
print(ray.get(c.get.remote()))
# -> 10

✅ 适用场景:状态管理、模型服务、参数服务器等。

Objects

Ray的分布式对象存储能高效地跨集群管理数据。在Ray中处理对象主要有三种方式:

  • 隐式创建:当 Task 和 Actor 返回值时,这些值会自动存储在 Ray 的分布式对象存储中,并返回可在之后被检索的对象引用。
  • 显式创建:使用 ray.put() 将对象直接放入存储中。
  • 传递引用:可以将对象引用传递给其他 Task 和 Actor,这样可以避免不必要的数据复制,并支持延迟执行。
import numpy as np

# Define a task that sums the values in a matrix.
@ray.remote
def sum_matrix(matrix):
    return np.sum(matrix)

# Call the task with a literal argument value.
print(ray.get(sum_matrix.remote(np.ones((100, 100)))))
# -> 10000.0

# Put a large array into the object store.
matrix_ref = ray.put(np.ones((1000, 1000)))

# Call the task with the object reference as an argument.
print(ray.get(sum_matrix.remote(matrix_ref)))
# -> 1000000.0

总结

概念 作用 关键 API
Task 无状态并行计算 @ray.remote, .remote(), ray.get()
Actor 有状态远程对象 @ray.remote class, 方法 .remote()
Object 分布式共享数据 ray.put()

Ray Core 的设计哲学:保持 API 极简,但组合能力极强,可构建从简单并行脚本到复杂分布式系统的各类应用。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐