设计背景

Ray 是一个为通用分布式计算而设计的开源框架,其设计背景主要源于以下几个方面的需求和挑战:

人工智能与机器学习工作负载的复杂性增长
随着强化学习(Reinforcement Learning)、超参数调优(Hyperparameter Tuning)、模型服务(Model Serving)等 AI 应用的发展,传统的大数据处理框架(如 Hadoop、Spark)在应对低延迟、高并发、异构任务调度等方面显得力不从心。这些新兴 AI 工作负载通常具有以下特点:

  • 任务粒度细(毫秒级甚至微秒级)
  • 动态任务生成(例如强化学习中的 rollout 会动态触发训练任务)
  • 混合计算模式(CPU/GPU、同步/异步、批处理/流处理)
  • 对低延迟和高吞吐有严格要求

现有系统无法满足灵活、高效的分布式执行需求
传统的分布式系统(如 Spark)基于批处理模型,任务调度开销大,难以支持细粒度、动态的任务图(task graph)。而高性能计算(HPC)或消息传递接口(MPI)虽然性能高,但编程模型复杂、容错能力弱、不易扩展到大规模异构集群。

需要统一的编程模型支持多种 AI 场景
研究人员和工程师希望有一个统一的框架,既能用于分布式训练、也能用于推理服务、自动调参、模拟仿真等场景,而无需为每种任务使用不同的系统。Ray 的设计目标之一就是提供一个“通用”的分布式运行时,通过简单的 Python API 实现复杂的分布式应用。

来自伯克利 RISELab 的研究驱动
Ray 最初由加州大学伯克利分校的 RISELab(前身为 AMPLab,也是 Spark 的诞生地)于 2017 年左右开始研发。RISELab 的研究重点是“实时智能与安全”,因此 Ray 被设计为支持低延迟、高吞吐、可扩展且容错的分布式执行引擎,特别适合构建实时 AI 系统。

对易用性和可扩展性的双重追求
Ray 强调“简单即强大”:用户只需通过 @ray.remote 装饰器即可将普通 Python 函数或类变为分布式可执行单元,底层自动处理任务调度、对象存储、故障恢复等复杂问题。同时,Ray 的架构(如基于共享内存的对象存储、去中心化的任务调度)使其能够线性扩展到数千节点。

Ray 的设计背景是为了解决现代 AI 和机器学习应用中对低延迟、高并发、动态任务调度、异构资源利用和易用性的综合需求,填补了传统大数据框架与高性能计算系统之间的空白,成为一个面向未来智能应用的通用分布式计算平台。

项目定位与目标

解决什么问题?

细粒度任务调度与低延迟执行

  • 传统大数据框架(如 Spark)面向批处理,任务调度开销大(通常数百毫秒),难以支持毫秒级甚至微秒级的任务。
  • Ray 支持细粒度、动态生成的任务图(task graph),适用于强化学习中的 rollout、超参搜索中的 trial 等场景。

动态与异构工作负载的支持不足

  • AI 工作流常包含混合任务:CPU 计算、GPU 推理、模拟环境、模型训练、在线服务等。
  • Ray 能统一调度异构资源(CPU/GPU/TPU)并支持动态任务依赖(例如一个任务的结果决定后续启动哪些任务)。

缺乏统一的编程模型

  • 过去开发者需为不同场景(训练、调参、部署)使用不同工具(如 Spark + Kubernetes + 自定义调度器)。
  • Ray 提供统一的运行时和 API,覆盖从实验到生产的全生命周期。

容错性与可扩展性之间的权衡

  • 高性能系统(如 MPI)通常牺牲容错;容错系统(如 Hadoop)牺牲性能。
  • Ray 在保证低延迟和高吞吐的同时提供任务级容错(自动重试失败任务)。

Python 生态的分布式能力缺失

  • Python 是 AI 主流语言,但原生不支持高效分布式执行。
  • Ray 以Python 为第一语言,通过极简 API(如 @ray.remote)实现透明分布式化。

目标用户是谁?
AI/ML 研究人员与工程师

  • 需要快速实验强化学习、超参数优化、大规模模拟等场景。
  • 希望用 Python 编写分布式代码而无需深入底层系统细节。
    MLOps 与平台团队
  • 构建统一的 AI 平台,整合训练、调参、推理、监控等环节。
  • 利用 Ray 的可扩展性和资源隔离能力部署生产级服务。

数据科学家与应用开发者

  • 希望将单机脚本轻松扩展到多机集群,提升计算效率。
  • 使用 Ray Data、Ray Serve 等高级库进行数据处理与模型部署。

企业与云服务商

  • 如 Amazon(SageMaker)、Google(Vertex AI)、Microsoft(Azure ML)等已集成 Ray。
  • 用于构建托管 AI 服务,降低客户使用分布式 AI 的门槛。

核心设计理念是什么?
Task-Based Execution Model(基于任务的执行模型)

  • 将计算抽象为无状态的任务(tasks)有状态的 Actor(actors)
  • 任务是函数调用,Actor 是可跨节点共享的状态对象(如模型服务器、环境模拟器)。

Shared-Memory Object Store(共享内存对象存储)

  • 使用 Apache Arrow 格式在节点内通过零拷贝共享内存传递数据,极大减少序列化/反序列化开销。
  • 跨节点通过高效 RPC 传输,支持大对象缓存与复用。

Decentralized, Scalable Scheduling(去中心化调度)

  • 采用两级调度架构:全局调度器(Global Scheduler) + 本地调度器(Local Scheduler)。
  • 支持线性扩展至数千节点,调度延迟低(<1ms)。

Language-First, Developer-Centric(以语言和开发者为中心)

  • 不要求用户学习新 DSL 或配置复杂集群。
  • 通过装饰器(@ray.remote)将普通 Python 函数/类“一键分布式化”。

Unified Runtime for AI Workflows(AI 工作流的统一运行时)

  • 构建上层库(如 Ray Tune、Ray RLlib、Ray Serve)共享同一底层运行时,避免系统碎片化。

整体架构概览

主要组件有哪些?它们如何协作?

Ray 的架构设计围绕“高性能、易用性、可扩展性”三大目标,其系统由多个协同工作的核心组件构成。这些组件共同提供了一个统一的分布式运行时,支持从细粒度任务到有状态服务的各类 AI 工作负载。以下是 Ray 的主要组件及其协作方式:

Driver(驱动程序)
  • 作用:用户编写的 Python 程序入口(即主进程),负责提交任务、创建 Actor、获取结果。
  • 特点
    • 每个 Ray 应用启动时会有一个 Driver。
    • 不执行计算任务,只负责协调和调度请求。
  • 协作:向全局控制层(GCS)注册自身,并通过本地调度器提交任务。
Worker(工作进程)
  • 作用:实际执行任务(tasks)或托管 Actor 的进程。
  • 类型
    • 无状态 Worker:执行普通函数任务(@ray.remote 函数)。
    • Actor Worker:运行有状态的 Actor 对象(@ray.remote 类实例)。
  • 特点
    • 每个节点可启动多个 Worker。
    • 自动从对象存储读取输入、将输出写回对象存储。
  • 协作:从本地调度器领取任务,执行后将结果存入对象存储,并通知调度器任务完成。
Object Store(对象存储)
  • 作用:高效存储和共享任务输入/输出数据(称为“对象”,Objects)。
  • 关键技术
    • 基于 Apache Arrow 内存格式。
    • 节点内使用 共享内存(Plasma),实现零拷贝数据访问。
    • 跨节点通过 gRPC + 自定义传输协议 复制对象。
  • 协作
    • Worker 执行任务前从 Object Store 读取依赖对象。
    • 任务结果直接写入本地 Object Store。
    • 若其他节点需要该对象,自动触发跨节点拉取。

优势:避免重复序列化,大幅提升数据密集型任务性能。

Scheduler(调度器)

Ray 采用两级调度架构

a) Global Scheduler(全局调度器)
  • 作用:决定任务应分配到哪个节点(基于资源可用性、位置亲和性等)。
  • 部署:通常与 Head Node 上的 GCS 共置。
b) Local Scheduler(本地调度器,现称 Core Worker Scheduler)
  • 作用:在本节点内将任务分发给空闲 Worker。

  • 特点:轻量、低延迟(微秒级)。

  • 协作流程

    1. Driver 提交任务 → Local Scheduler(若本地资源不足)→ 转发给 Global Scheduler。
    2. Global Scheduler 选择目标节点 → 将任务路由到该节点的 Local Scheduler。
    3. Local Scheduler 分配给 Worker 执行。

支持动态资源感知(CPU/GPU/自定义资源)和负载均衡。

Global Control Store (GCS)
  • 作用:Ray 的“元数据中心”,存储集群的全局状态。
  • 管理内容
    • 节点注册与心跳(Node Manager 注册信息)
    • Actor 生命周期(位置、状态)
    • 任务元数据(DAG 依赖、状态)
    • 对象位置映射(哪些节点持有某对象)
  • 实现:早期基于 Redis,新版本(Ray 2.0+)逐步迁移到更高效的内部存储(如 GCS Server + Raft 共识)。
  • 协作
    • 所有节点(Head + Workers)定期向 GCS 报告状态。
    • 调度器查询 GCS 获取集群视图。
    • 容错时用于重建失败 Actor 或任务。
Raylet(节点管理器)
  • 作用:每个非 Head 节点上的本地守护进程,整合了 Local Scheduler 和 Object Store 的控制逻辑。
  • 功能
    • 管理本节点的资源(CPU/GPU/内存)
    • 启动/监控 Worker 进程
    • 与 GCS 通信汇报状态
    • 协调本地任务调度与对象存储
  • 协作
    • 向 GCS 注册节点能力。
    • 接收来自其他节点的任务请求。
    • 触发对象跨节点传输。

Raylet 是 Ray 实现去中心化、可扩展性的关键组件。

Core Worker(核心工作库)
  • 作用:嵌入在每个 Python/Java/C++ 进程中的客户端库,提供 Ray API 的底层实现。
  • 功能
    • 管理任务提交、对象引用(ObjectRef)
    • 与本地 Raylet 通信
    • 执行任务或 Actor 方法
  • 协作
    • Driver 和 Worker 都包含 Core Worker。
    • 将用户代码(如 f.remote())转化为内部任务描述并提交。

画一张简单的架构图(可用文字描述)

Object Store Worker Process GCS (Global Control Store) Raylet (Local Scheduler + Node Manager) Core Worker Driver (User Code) Object Store Worker Process GCS (Global Control Store) Raylet (Local Scheduler + Node Manager) Core Worker Driver (User Code) alt [Local resources available] [Not enough local resources] 1. Call f.remote() 2. Submit task (check local resources) 3a. Assign task to Worker 3b. Request global scheduling decision 4. Return target node info 5. Assign task to Worker (on selected node) 6. Read input objects (if any) 7. Execute task function 8. Write result object 9. Register object location 10. Notify result ready (via reference) 11. Return ObjectRef 12. Call ray.get(ref) 13. Query object location 14. Return node holding object 15. Fetch result from Object Store (local or remote) 16. Return data 17. Return result to user

Cluster Control Plane (Head Node)

Node-Level Services (per worker node)

Client/Runtime Layer (per process)

User Layer

1. 调用 f.remote()
2. 提交任务 & 查询资源
3. 上报状态 / 查询全局视图
4. 调度任务
5. 管理对象生命周期
6. 读输入 / 写输出
7. 注册对象位置
8. ray.get(): 查询位置并拉取
9. (间接) 查询 Actor/对象位置

Driver(User Python Code) - 提交任务
- 获取结果

Core Worker(Embedded Library)
- 封装 remote() 调用
- 管理ObjectRef
- 与 Raylet 通信

Raylet(Node Manager + Local Scheduler)
- 管理 CPU/GPU 资源
- 启动 Worker
- 本地任务调度\n- 与 GCS 同步状态

Object Store(Shared Memory via Plasma)
- 存储任务输入/输出
- 零拷贝访问(同节点)
- 跨节点对象传输

Worker Process(Task Executor / Actor Host)
- 执行 @ray.remote 函数
- 运行 Actor 实例
- 读写 Object Store

GCS(Global Control Store)
- 全局元数据存储
- 节点注册与心跳
- Actor 位置记录
- 对象位置目录
- 全局调度决策支持

全局概览图

Worker Node (e.g., Node X)

Head Node

User Entry

Raylet Internals

1. Submit task via Core Worker

2a. Local resources sufficient?

3a. Schedule locally

2b. Local resources insufficient?

4. Query cluster state from GCS
5. Return node/resource info
6. Assign task to target Raylet
7. Schedule on Worker
8. Read inputs / Write outputs
9. Register object location in GCS
10. ray.get(ref): fetch result

Heartbeat & State Report

Driver
(User Code)

- 提交任务 f.remote() - 调用 ray.get() 获取结果

GCS(Global Control Store)
- 节点/Actor/对象注册表
- 集群元数据存储

Global Scheduler(Head Node Only)
- 跨节点调度决策
- 基于 GCS 资源视图分配任务

Raylet(Per-Node Daemon)

Local Scheduler
- 接收本地/远程任务
- 分配给 Worker

Object Store
- 存储任务输入/输出
- 零拷贝共享(同节点)

Worker Management
- 启动 Worker Process
- 执行 Task / Actor

关键设计与实现机制

选择 2–3 个最具代表性的设计点深入分析

🔹 设计点 1:GCS —— 全局控制存储(Global Control Store)

  • 问题背景
    在大规模分布式集群中,如何实现:

    • 节点动态加入/退出的自动发现?
    • Actor 和任务结果对象的全局定位?
    • 调度器获取一致的集群资源视图(CPU/GPU/自定义资源)?
  • 解决方案
    GCS 作为集群的统一元数据中心,提供以下核心服务:

    • 注册表:记录所有活跃节点、Actor 实例、对象(Object)的位置信息;
    • 资源目录:维护每个节点的资源总量与已分配量;
    • 状态同步:支持任务/Actor 的容错重建(通过元数据恢复执行上下文)。
  • 关键技术演进

    • Ray < 2.0:基于 Redis 作为后端存储,GCS 逻辑运行在 Head Node;
    • Ray ≥ 2.0:引入 原生 GCS Server,支持 Raft 共识协议(为未来高可用做准备),逐步减少对 Redis 的依赖;
    • 通信协议:通过 gRPC 与各节点 Raylet 同步心跳和状态。
  • 优点

    • 提供强一致的全局视图,支撑智能调度与容错;
    • 解耦控制平面与数据平面(GCS 不传数据,只管“位置”);
    • 支持动态扩缩容和异构资源管理。
  • 缺点 / 挑战

    • 当前开源版 GCS 仍为单点(Head Node 上),存在单点故障风险;
    • 元数据规模随集群增长而膨胀,可能成为性能瓶颈;
    • Raft HA 模式尚未在社区版默认启用,生产环境需额外保障。

🔹 设计点 2:Raylet —— 节点级自治运行时

  • 问题背景
    如何在保持全局协调的同时,实现低延迟、去中心化的任务执行?如何让每个节点独立管理资源、调度任务、处理故障?

  • 解决方案
    每个节点部署一个 Raylet 守护进程,作为本地“微型操作系统”,集成三大核心能力:

    • 本地调度器(Local Scheduler):接收任务并分配给空闲 Worker;
    • 节点资源管理器:跟踪 CPU/GPU/内存使用,上报 GCS;
    • 对象存储协调器:管理本地 Object Store 的读写与跨节点拉取。
  • 关键技术

    • 两级调度架构:本地快速调度 + 全局后备协调;
    • 资源抽象模型:支持自定义资源(如 {"GPU": 1, "special_hw": 1});
    • 与 Core Worker 的 IPC 通信:通过 Unix Domain Socket 或 TCP 高效交互。
  • 优点

    • 低调度延迟(微秒级),适合细粒度任务;
    • 去中心化:即使 GCS 短暂不可用,本地任务仍可执行;
    • 资源隔离:每个节点独立管理,避免全局锁竞争。
  • 缺点

    • 每个节点需常驻 Raylet 进程,带来固定资源开销(约 100–300MB 内存);
    • 调度策略相对简单(主要基于资源可用性),不如 Kubernetes 灵活;
    • 跨节点任务依赖仍需 GCS 协调,无法完全脱离中心控制。

🔹 设计点 3:Object Store —— 高性能分布式对象存储

  • 问题背景
    AI 工作负载常涉及大量中间数据(如 rollout 样本、梯度、模型参数),如何:

    • 避免重复序列化/反序列化?
    • 实现同节点多任务零拷贝共享
    • 高效支持跨节点数据拉取?
  • 解决方案
    每个节点部署一个 基于共享内存的对象存储,特点包括:

    • 使用 Apache Arrow 内存格式 存储不可变对象;
    • 同节点内通过 共享内存(Shared Memory) 实现零拷贝访问;
    • 跨节点通过 异步 pull 模型 拉取对象(由请求方主动触发)。
  • 关键技术

    • Plasma(早期)→ Arrow Shared Memory(当前):统一内存布局;
    • 引用计数 + 自动驱逐(Spilling):内存不足时 spill 到磁盘;
    • 对象位置缓存:Core Worker 本地缓存对象位置,减少 GCS 查询;
    • 批量传输优化:大对象分块传输,避免网络阻塞。
  • 优点

    • 极大降低数据传输开销,尤其适合迭代式 AI 计算;
    • 天然支持数据复用(多个任务消费同一对象无需复制);
    • 与 Python 对象无缝集成(通过 ray.put() / ray.get())。
  • 缺点

    • 内存敏感:大对象或高并发易导致 OOM,需合理配置 object_store_memory
    • 跨节点传输仍依赖网络带宽,大模型场景可能成为瓶颈;
    • 不支持对象更新(Immutable Design),不适合频繁修改的场景。

这三者共同构成了 Ray “去中心化执行 + 中心化协调 + 高效数据共享” 的独特架构,特别适合现代 AI 工作负载

我的收获与启发

把“学到的东西”转化为“我能用的东西”

启发 我可以怎么应用到实际工作中?
基于共享内存的对象存储 + 零拷贝访问 在构建高性能 Python 数据处理服务时,可借鉴 Ray 的思路:
• 使用 Apache Arrow(而非 pickle)作为内存数据格式,实现跨进程/线程零拷贝共享;
• 对高频复用的中间结果(如特征向量、预处理数据),缓存在共享内存中,避免重复序列化开销;
• 若无法引入 Ray,可用 multiprocessing.shared_memory + Arrow 手动实现轻量级对象缓存。
异步 Pull 模型 + 按需加载 在微服务或分布式任务系统中:
• 设计数据依赖时采用“懒加载”策略,仅在消费者真正需要时才触发数据传输;
• 通过全局元数据服务(如 etcd/Redis)记录数据位置,解耦生产者与消费者;
• 避免提前广播大数据,节省带宽。
去中心化调度 + 节点自治(Raylet) 在边缘计算或混合云场景:
• 让每个节点具备本地调度能力,减少对中心控制平面的依赖;
• 本地优先执行任务,仅在资源不足时请求全局协调,提升响应速度与容错性。

延伸思考(可选)

- 如果让你改进 Ray,你会做什么?

  1. 智能网络感知调度

    当前 Ray 调度器主要基于 CPU/GPU 资源,忽略网络拓扑。可扩展为:
    • 自动探测节点间带宽/延迟(如通过心跳包测速);
    • 对于大对象依赖的任务,优先调度到同机架/同可用区的节点;
    • 对网络较差的节点(如跨云、边缘设备),自动限制其接收大对象任务,或启用压缩传输。

  2. 增强 GCS 高可用与持久化

    开源版 GCS 仍是单点。可:
    • 默认启用 Raft 多副本模式;
    • 支持将元数据快照持久化到 S3 或数据库,实现 Head 节点完全故障恢复。

  3. 对象存储的分层存储(Tiered Storage)

    当前 Spilling 仅支持内存 → 磁盘。可扩展为:
    • 热数据:内存(Object Store)
    • 温数据:本地 SSD
    • 冷数据:远程对象存储(如 S3)
    并自动根据访问频率迁移,支撑超大规模中间数据。

  4. 失败任务的数据血缘追踪

    当 Worker 崩溃导致对象丢失,Ray 会重试任务,但不记录数据血缘。可集成类似 LineageStore 的机制,快速重建丢失对象,而非从头执行整个 DAG。

- Ray 不适合什么场景?

场景 原因 替代方案建议
强一致性事务处理 Ray 的对象是不可变的,无 ACID 保证,不适合银行转账等场景 使用传统数据库(PostgreSQL, Spanner)
超低延迟实时流处理(<1ms) 异步 pull 模型有调度+网络延迟,无法满足硬实时要求 使用 Flink、Kafka Streams 或专用 DSP
长时间运行的有状态服务(非 Actor 模式) 虽然 Actor 可长期运行,但 Ray 本身不是为通用 Web 服务设计 使用 FastAPI + Kubernetes,或 Ray Serve(仅限模型推理)
极小规模单机任务 Ray 启动开销(约 1–2 秒)和内存占用(Head + Raylet)过高 直接用 Python 多进程/线程或 asyncio
需要精细网络 QoS 控制的环境 Ray 不提供带宽限速、优先级队列等网络控制 在 Kubernetes 层面通过 CNI 插件实现

总结

Ray 是为 动态、细粒度、高并发的 AI/ML 工作负载量身打造的运行时,其 Object Store + 异步 Pull + 去中心化调度 架构极具启发性。但在借鉴时,应聚焦其内存模型与调度哲学,而非简单套用序列化方式。同时,明确其边界——不是万能分布式框架,而是 AI 时代的“分布式胶水”

参考资料

  • 官方文档链接
  • GitHub 仓库
  • 推荐阅读文章或视频

对象传输和发现解析

为什么需要“异步 pull 模型”?—— 设计动机

在分布式系统中,任务 A 在节点 X 产生一个对象 O,任务 B 在节点 Y 需要消费 O。如何把 O 从 X 传到 Y?

传统方案(如 Spark)常采用 push 模式:生产者(X)主动将数据推送给所有消费者(Y)。
但 Ray 面向的是 动态、细粒度、高并发的 AI 任务,具有以下特点:

  • 消费者可能尚未创建(例如强化学习中 rollout 动态触发训练任务);
  • 一个对象可能被多个未知消费者使用;
  • 网络带宽宝贵,应避免提前推送未被请求的数据

因此,Ray 选择 pull 模型只有当消费者明确需要对象时,才触发拉取

核心思想:“按需拉取,懒加载”(Lazy Fetching)

异步 Pull 模型的工作原理

1. 对象标识与位置注册
  • 每个对象由全局唯一的 Object ID 标识(如 ffffffffffffffffffffffff010000c801000000)。
  • 当 Worker 在节点 X 的 Object Store 中创建对象 O 后:
    • Object Store 通知本地 Raylet
    • Raylet 将 (Object ID → Node X) 的映射注册到 GCS
2. 消费者发起请求
  • 当另一个节点 Y 上的任务调用 ray.get(object_ref) 时:
    • 节点 Y 的 Core Worker 查询本地缓存是否已有该对象;
    • 若无,则向本地 Raylet 发起“获取对象 O”的请求。
3. Raylet 触发异步拉取
  • 节点 Y 的 Raylet 向 GCS 查询 “Object O 存在于哪些节点?”;
  • GCS 返回一个或多个持有该对象的节点列表(如 [Node X, Node Z]);
  • Raylet 选择一个源节点(通常基于网络拓扑、负载等策略),并向其 Raylet 发送 Pull Request
4. 异步数据传输
  • 源节点(X)的 Raylet 从本地 Object Store 读取对象 O;
  • 通过 gRPC 流式传输 将 O 发送给目标节点 Y;
  • 目标节点 Y 的 Raylet 接收数据后,写入本地 Object Store
  • 整个过程不阻塞消费者线程ray.get() 在后台等待,直到对象就绪。
5. 通知与就绪
  • 对象写入 Y 的 Object Store 后,触发 就绪通知
  • Core Worker 被唤醒,ray.get() 返回数据;
  • 同时,Y 的 Raylet 向 GCS 注册:“我也持有对象 O”,供后续消费者复用。

关键:拉取是异步的、非阻塞的、按需触发的

关键技术机制

对象位置缓存(Location Caching)
  • 每个节点的 Raylet 会缓存常用对象的位置信息,减少对 GCS 的查询压力。
  • 缓存通过 GCS 的 订阅机制(Pub/Sub) 自动更新(如对象被驱逐时通知)。
多副本支持(Multiple Locations)
  • 一个对象可存在于多个节点(如广播场景);
  • Pull 时可从最近或最空闲的副本拉取,提升吞吐。
批量与流式传输
  • 大对象被切分为 chunks,通过 gRPC 流式发送,避免内存峰值;
  • 支持 并行拉取多个对象,提高带宽利用率。
背压与流控(Backpressure)
  • 若接收方 Object Store 内存不足,会暂停拉取,防止 OOM;
  • 结合 Spilling(溢出到磁盘) 机制缓解内存压力。
引用计数与生命周期
  • 对象在 Object Store 中通过 引用计数 管理生命周期;
  • 当所有引用(包括远程拉取中的)释放后,对象被自动回收。

一个完整流程示例

# Node A
@ray.remote
def produce():
    return np.random.rand(1000, 1000)  # 大数组

# Node B
@ray.remote
def consume(data):
    return data.sum()

# Driver (on Node C)
obj_ref = produce.remote()          # 任务在 Node A 执行,对象存于 A
result = consume.remote(obj_ref)    # 任务调度到 Node B,B 需要 obj_ref

执行流程

  1. produce 在 Node A 完成,对象 O 存入 A 的 Object Store;
  2. A 的 Raylet 向 GCS 注册:O ∈ {A};
  3. consume 被调度到 Node B;
  4. B 的 Worker 执行 consume,发现需要 O;
  5. B 的 Raylet 查询 GCS → 得知 O 在 A;
  6. B 的 Raylet 向 A 的 Raylet 发起 Pull Request;
  7. A 通过 gRPC 将 O 异步发送给 B;
  8. B 将 O 存入本地 Object Store;
  9. consume 读取 O 并计算,返回结果。

整个过程中,Driver 和 Worker 均无需关心“数据在哪”、“怎么传”,完全透明。

优势 vs 局限

优势 局限 / 挑战
按需传输:避免无效网络开销 首次访问延迟:pull 引入额外 RTT
解耦生产者与消费者:生产者无需知道谁会消费 GCS 成为元数据瓶颈:高频查询可能压力大
天然支持多消费者:每个消费者独立拉取 大对象拉取可能阻塞网络(需配合流控)
易于扩展:新增节点自动参与 pull 网络 不适用于实时流场景(pull 是请求驱动,非 push 实时)

与其他系统的对比

系统 数据传输模型 适用场景
Ray 异步 Pull 细粒度任务、动态 DAG、AI 工作流
Spark Push(Shuffle) 批处理、静态 DAG
Dask Pull + Caching 中等粒度任务,Python 生态
MPI 显式 Send/Recv HPC,程序员手动管理通信

总结

Ray 的 异步 pull 模型 是其面向动态、细粒度、高并发 AI 任务的核心设计之一。它通过 “懒加载 + 按需拉取 + 共享内存 + 异步传输” 的组合,在保证易用性的同时,实现了高性能和资源效率的平衡。虽然在首次访问时有轻微延迟,但在典型 AI 工作负载(如多次迭代、数据复用)中,其优势远大于代价。

逻辑资源隔离和多资源协同调度

逻辑隔离:不是物理隔离,而是“资源记账 + 调度约束”

✅ 什么是“逻辑隔离”?

Ray 不直接控制硬件,而是通过以下方式实现“软隔离”:

  • 资源记账(Resource Accounting):每个节点维护一个资源池(如 {"CPU": 32.0, "GPU": 8});
  • 调度准入控制(Admission Control):任务只有在资源足够时才被调度;
  • 运行时无强制限制:任务一旦启动,可使用任意 CPU/GPU(依赖 OS/框架隔离)。

📌 关键:逻辑隔离 = “你只能申请这么多,但用了多少我们不管”。

多资源协同:统一资源模型支持任意组合

Ray 将所有资源视为 键值对(key-value),支持 CPU + GPU + 自定义资源 的任意组合。

1. 资源类型
类型 示例 特性
内置资源 "CPU", "GPU" 自动探测,可分数(CPU)或整数(GPU)
自定义资源 "TPU", "FPGA", "license" 用户定义,整数单位
2. 任务/Actor 声明多资源
@ray.remote(
    num_cpus=4,          # 4 个逻辑 CPU
    num_gpus=2,          # 2 张 GPU
    resources={"special_hw": 1}  # 1 个自定义硬件
)
def distributed_train():
    ...

协同含义:该任务必须同时满足这三类资源才可调度。

调度机制:如何实现多资源协同分配?

Ray 采用 两级调度 + 全局资源视图 实现多资源协同:

步骤 1:任务提交
  • Driver 提交任务,携带资源需求 {CPU:4, GPU:2, special_hw:1}
步骤 2:本地调度尝试
  • 本地 Raylet 检查本机剩余资源:
    available = {"CPU": 8.0, "GPU": 1, "special_hw": 1}
    
  • GPU:1 < 2本地拒绝
步骤 3:全局调度介入
  • 本地 Raylet 向 Head Node 的 Global Scheduler 请求;
  • Global Scheduler 查询 GCS 获取全集群资源快照:
    Node A: {"CPU": 16, "GPU": 4, "special_hw": 1}
    Node B: {"CPU": 32, "GPU": 8, "special_hw": 0}
    
  • 匹配规则:找一个节点满足 所有资源 ≥ 请求
  • 选择 Node A(唯一满足 special_hw:1 的节点)。
步骤 4:资源预留与执行
  • GCS 标记 Node A 的资源为“已分配”:
    Node A allocated: {"CPU":4, "GPU":2, "special_hw":1}
    
  • 任务被路由到 Node A 的 Raylet;
  • Raylet 启动 Worker,设置环境变量(如 CUDA_VISIBLE_DEVICES=0,1);
  • 任务开始执行。

🔁 释放:任务完成后,资源自动归还。

典型案例:需要 10 个 GPU 的分布式训练

假设你要运行一个 需要 10 张 GPU 的 PyTorch DDP 训练任务,而单机最多 8 GPU。

方案 1:单 Actor 跨节点(不推荐)
  • ❌ Ray 不支持单个任务跨节点分配 GPU
    @ray.remote(num_gpus=10) 会因无单机满足而永远 pending
✅ 方案 2:多 Actor 协同(Ray 的标准做法)
设计思路:
  • 将 10 GPU 任务拆分为 10 个 Worker Actor,每个占 1 GPU;
  • 通过 Ray 的 Actor 引用 + NCCL/GLOO 通信 实现分布式训练。
代码示例:
@ray.remote(num_gpus=1)
class Trainer:
    def __init__(self, rank):
        self.rank = rank
        # 初始化 PyTorch DDP
        os.environ["MASTER_ADDR"] = ray.get(ray.nodes()[0]["NodeManagerAddress"])
        os.environ["MASTER_PORT"] = "12345"
        dist.init_process_group("nccl", rank=rank, world_size=10)

    def train(self, data_shard):
        # 执行分布式训练 step
        return loss

# 启动 10 个 Trainer(自动跨节点调度)
trainers = [Trainer.remote(rank=i) for i in range(10)]

# 并行执行
losses = ray.get([t.train.remote(shard) for t, shard in zip(trainers, data_shards)])
调度过程:
  1. Ray 遍历集群,找到 至少 10 张 GPU 的总和(如 2 节点 × 8 GPU = 16 > 10);
  2. 将 10 个 Trainer 分配到不同节点(如 Node A: 6 个, Node B: 4 个);
  3. 每个 Trainer 独占 1 GPU,通过 CUDA_VISIBLE_DEVICES 隔离;
  4. 应用层(PyTorch)负责跨节点通信(需确保网络互通)。

优势

  • 充分利用集群 GPU 资源;
  • 自动处理节点异构(如部分节点有 A100,部分有 V100);
  • 容错:若一个 Trainer 失败,可重建并恢复训练。

高级协同:资源亲和性与放置组(Placement Groups)

对于 强耦合任务(如 AllReduce 需要低延迟网络),Ray 提供 Placement Group 实现资源捆绑分配

示例:10 GPU + 低延迟要求
# 创建一个 placement group,要求 10 个 GPU 在同一节点(或同一 rack)
pg = ray.util.placement_group(
    bundles=[{"GPU": 1}] * 10,   # 10 个 bundle,每个 1 GPU
    strategy="STRICT_PACK"       # 必须在同一节点
)

# 等待资源就绪
ray.get(pg.ready())

# 在 placement group 中启动 Actors
trainers = [
    Trainer.options(placement_group=pg).remote(rank=i)
    for i in range(10)
]
策略说明:
策略 行为 适用场景
STRICT_PACK 所有 bundle 必须在同一节点 单机多卡训练
PACK 尽量打包,允许跨节点 默认行为
SPREAD 尽量分散到不同节点 容错优先
STRICT_SPREAD 必须分散到不同节点 高可用服务

Placement Group 是 Ray 实现“多资源协同+拓扑感知”的关键抽象

容错与弹性扩展

  • 任务失败:Ray 自动重试(可配置),重建 Actor 并重新分配资源;
  • 节点故障:GCS 检测心跳丢失,标记资源为不可用,后续任务避开该节点;
  • 动态扩缩容:新节点加入后,自动注册资源,Global Scheduler 立即可用。

总结:Ray 如何做到逻辑隔离与多资源协同?

能力 实现机制 用户价值
逻辑隔离 资源记账 + 调度准入控制 防止过载,保证任务可调度性
多资源协同 统一资源模型 + 全局调度 支持 CPU/GPU/自定义资源任意组合
跨节点 GPU 利用 多 Actor 模式 + Placement Group 突破单机 GPU 限制
拓扑感知 Placement Group 策略 优化通信性能(同节点/同 rack)
弹性容错 GCS 状态管理 + 自动重试 保障长时间训练任务可靠性

核心思想
“把复杂留给自己,把简单留给用户” —— 用户只需声明“我要什么资源”,Ray 负责在异构集群中找到最优位置,并协调它们一起工作。

如果你正在设计一个需要 10+ GPU 的训练系统,不要试图用单个任务请求 10 GPU,而是:

  1. 拆分为多个 Actor
  2. 用 Placement Group 控制布局
  3. 在应用层实现通信(如 PyTorch DDP / Horovod)。

Actor 模型 + 标准通信库(NCCL/GLOO)

核心思想:分工协作

组件 职责
Ray • 启动并管理多个 GPU Worker(Actor)
• 跨节点调度
• 提供 Actor 引用(用于协调)
• 容错与自动重启
NCCL / GLOO • 提供高性能 GPU/CPU 间 collective 通信(AllReduce, Broadcast 等)
• 管理底层网络传输(InfiniBand/RoCE/TCP)

Ray 不 reinvent the wheel,而是复用 PyTorch/TensorFlow 已有的成熟通信后端。

实现原理详解

步骤 1:启动多个 Trainer Actor(每个占 1 GPU)
@ray.remote(num_gpus=1)
class Trainer:
    def __init__(self, rank, world_size, master_addr, master_port):
        self.rank = rank
        # 设置环境变量(PyTorch DDP 要求)
        os.environ["MASTER_ADDR"] = master_addr
        os.environ["MASTER_PORT"] = str(master_port)
        # 初始化进程组
        dist.init_process_group(
            backend="nccl",      # 或 "gloo"(CPU)
            rank=rank,
            world_size=world_size
        )
        self.model = ...  # 加载模型
  • 每个 Trainer 是一个独立的 Python 进程;
  • Ray 通过 num_gpus=1 确保每个进程独占一张 GPU;
  • CUDA_VISIBLE_DEVICES 由 Ray 自动设置,保证进程只看到分配的 GPU。
步骤 2:建立通信拓扑(关键!)

PyTorch DDP 需要一个 “主节点”(Master) 来协调所有 Worker 的连接。在 Ray 中,有几种方式:

✅ 推荐方式:使用 Head Node IP 作为 MASTER_ADDR
# 获取 Head Node 的内部 IP(所有节点可访问)
head_ip = ray.nodes()[0]["NodeManagerAddress"]
trainers = [
    Trainer.remote(rank=i, world_size=10, master_addr=head_ip, master_port=12345)
    for i in range(10)
]
  • 所有 Trainer 进程连接到 head_ip:12345
  • 第一个连接的进程(通常是 rank=0)成为 store server(由 PyTorch 内部启动);
  • 其他进程从该 server 获取彼此的地址,建立 P2P 连接。

🔍 底层:PyTorch 使用 c10d::TCPStoreFileStore 实现 rendezvous。

步骤 3:执行分布式训练 Step
def train_step(self, batch):
    outputs = self.model(batch)
    loss = compute_loss(outputs)
    loss.backward()
    
    # 关键:AllReduce 梯度(跨所有 GPU)
    for param in self.model.parameters():
        dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)
        param.grad /= self.world_size  # 平均梯度
    
    optimizer.step()
  • dist.all_reduce() 触发 NCCL AllReduce(GPU 显存 → GPU 显存);
  • NCCL 自动选择最优算法(Ring, Tree, etc.)和网络路径;
  • 整个过程不经过 CPU 或 Ray,纯 GPU 通信。
步骤 4:结果聚合(可选)

训练结束后,可通过 Ray 的 ray.get() 收集各 Actor 的结果:

losses = ray.get([t.train_step.remote(batch) for t in trainers])
avg_loss = sum(losses) / len(losses)

⚠️ 注意:通信(AllReduce)发生在训练内部,不是通过 Ray 传输数据

容错机制

  • 如果某个 Trainer 崩溃:
    • Ray 自动重启该 Actor(需用户代码支持 checkpoint);
    • PyTorch 进程组已失效,需重建整个训练作业(目前 Ray 不支持 DDP 级别细粒度恢复);
  • 生产建议:
    • 定期保存 checkpoint 到共享存储(如 S3);
    • 使用 ray.train(Ray Train)框架,内置容错支持。

训练过程流程图(Mermaid)

以下是一张 任务驱动的分布式训练流程图,展示从启动到通信的全过程:

Each Trainer (on assigned GPU)

1. Get Head Node IP
2. Launch 10 Trainers with Placement Group
2. Launch 10 Trainers
2. Launch 10 Trainers
2. Launch 10 Trainers

3a. Set MASTER_ADDR=HeadIP
MASTER_PORT=12345

3b. Same

3c. Same

4a. Connect to TCPStore
(Rank 0 becomes server)

4b. Connect to TCPStore

4c. Connect to TCPStore

5. NCCL/GLOO establishes P2P connections between all ranks
6. Each trainer runs: - Forward/Backward - dist.all_reduce(grads) - Optimizer step
7. (Optional) Return loss/metrics

Driver(ray.init())

Head Node IP

Trainer Actor (Rank 0)

Trainer Actor (Rank 1)

...

Trainer Actor (Rank 9)

Initialize Process Group
(dist.init_process_group)

Initialize Process Group

Initialize Process Group

Rendezvous Complete

Communication Topology Ready

Training Step

Driver collects via ray.get()

图中关键说明:
  1. Driver 获取 Head Node IP 作为通信协调点;
  2. 10 个 Trainer Actor 被 Ray 调度到不同节点(可能跨机器);
  3. 每个 Trainer 设置相同的 MASTER_ADDR/PORT,调用 dist.init_process_group
  4. PyTorch 内部通过 TCPStore 完成 rendezvous(地址交换);
  5. NCCL/GLOO 建立全连接通信拓扑(All-to-All);
  6. 训练过程中,梯度同步通过 NCCL AllReduce 直接在 GPU 间完成
  7. 最终指标通过 Ray 的 ray.get() 汇总(小数据量,不影响性能)。

NCCL vs GLOO 选择

后端 适用场景 特点
NCCL GPU 训练 • 高性能(专为 NVIDIA GPU 优化)
• 支持 InfiniBand/RoCE
• 仅支持 GPU 张量
GLOO CPU 训练 / 小集群 • 支持 CPU 张量
• 基于 TCP,兼容性好
• 性能低于 NCCL

✅ 在 GPU 集群上,始终优先使用 NCCL

总结

  • Ray 的角色:提供 Actor 生命周期管理 + 资源调度 + 跨节点部署
  • NCCL/GLOO 的角色:提供 高性能 collective 通信
  • 协同方式:Ray 负责“把人叫齐并分配座位”,NCCL 负责“让他们高效对话”;
  • 数据流分离
    • 控制流:通过 Ray(启动、协调、收集结果);
    • 数据流:通过 NCCL(梯度同步,绕过 Ray)。

这种设计既利用了 Ray 的易用性和弹性,又保留了底层通信库的极致性能,是构建大规模 AI 系统的理想架构。

建议
对于生产级训练,推荐使用 Ray Trainray.train.torch.TorchTrainer),它封装了上述细节,并提供自动容错、混合精度、日志集成等能力。

Ray的OOM问题和背景

因为在实际过程中确实非常容易出现OOM的问题, 而且在程序运行之前其实很难预料要使用多少内存, 很多只是基于“经验值”想要“安全的”跑任务就要提高经验值的阈值, 容易造成内存的浪费?

这个问题非常真实,也确实是 Ray 在实际生产中面临的核心挑战之一

“内存需求难以预估 → 保守估计导致资源浪费,激进估计导致 OOM”

这本质上反映了 分布式系统中“资源弹性”与“资源安全”之间的根本矛盾。我们来深入分析这个问题的根源、现状以及可能的缓解路径。

为什么内存难以准确预估?

  1. 任务行为动态性强

    • 例如 RL 中的 rollout 长度可变、LLM 推理 batch size 动态调整、数据预处理中样本大小不一。
    • Python 对象内存占用本身就不透明(如 list 扩容、pandas DataFrame 内存碎片)。
  2. 依赖外部输入

    • 任务内存消耗取决于输入数据大小,而输入往往在运行时才知道。
  3. 框架开销不可控

    • Ray 的序列化(pickle/cloudpickle)、对象存储复制、中间缓存等都会额外占用内存,且难以建模。
  4. 缺乏运行时反馈机制

    • 当前 Ray 的 memory=静态声明,无法根据实际使用动态调整或拒绝超限任务。

当前设计的两难困境

策略 后果
按经验值设高阈值(如声明 2GB,实际用 500MB) ✅ 避免 OOM
❌ 资源利用率低(调度器认为用了 2GB,实际只用 0.5GB → 并行度下降)
按理论值设低阈值(如声明 500MB,实际峰值 1.8GB) ✅ 提高并行度
❌ 极易 OOM,节点崩溃

这就是你所说的:“为了安全跑任务,不得不浪费内存”。

Ray 社区是否意识到这个问题?——是的,且正在改进

已落地的改进:
  1. ray memory 命令
    可查看每个 ObjectRef 的内存占用,帮助事后分析。
  2. 自动 Spill to Disk(对象存储溢出)
    当 shared memory 满时,自动写入磁盘(需配置),避免 OOM(但性能下降)。
  3. Placement Groups + 弹性调度
    可预留资源池,隔离关键任务。
正在推进的方向(GitHub / RFC):
  • 运行时内存监控 + 任务驱逐:当 worker 内存超限时,自动取消任务(类似 Kubernetes OOMKiller)。
  • 动态资源请求:允许任务在运行中“申请更多内存”(尚未支持)。
  • 基于历史的自适应资源估算:类似 Spark 的 Adaptive Query Execution,根据前几次运行自动调整资源请求。

实用建议:如何在现有框架下平衡安全与效率?

1. 采用“分层资源策略”
  • 关键任务:保守估计 + Placement Group 预留
  • 弹性任务:激进估计 + 设置超时/重试 + 监控 OOM
2. 用容器/K8s 提供硬隔离
# KubeRay Pod spec 示例
resources:
  requests:
    memory: "1Gi"   # Ray 调度用(软)
  limits:
    memory: "2Gi"   # K8s 强制限制(硬)

这样即使 Ray 任务超用,K8s 会 kill 容器而非整个节点。

主动探测内存使用

在任务开始时采样内存:

import psutil
@ray.remote
def my_task(data):
    start_mem = psutil.Process().memory_info().rss
    # ... do work ...
    end_mem = psutil.Process().memory_info().rss
    print(f"Used {(end_mem - start_mem) / 1e6:.1f} MB")

积累几次后,用 P95 作为 memory= 的参考值。

限制单个节点的任务并发数

通过 num_cpus=1 或自定义资源(如 @ray.remote(resources={"task_slot": 1}))控制密度,避免“雪崩式 OOM”。

启用对象存储溢出(Spilling)
ray start --object-store-memory=4000000000 \
          --object-spilling-config='{"type":"filesystem","params":{"directory_path":"/tmp/spill"}}'

即使内存超用,也能 fallback 到磁盘,保住节点不崩溃(牺牲性能换可用性)。

更深层思考:这是 Ray 的问题,还是分布式系统的通病?

其实,几乎所有通用分布式框架都面临类似问题

框架 资源模型 是否硬隔离 内存预估难度
Spark spark.executor.memory 否(依赖 YARN/K8s) 高(需调优)
Dask memory_limit(per worker)
Flink taskmanager.memory 否(需配 JVM)
Ray memory=(per task) 极高(因任务粒度细)

💡 任务越细粒度,资源预估越难。Ray 的“微任务”模型放大了这一问题。

总结:不是设计失败,而是领域难题

  • Ray 的“约定式资源”不是 bug,而是为性能和灵活性做出的 conscious trade-off
  • OOM 风险确实存在,且对新手不友好,但这可以通过部署架构(如容器化)、监控工具和经验积累来缓解。
  • 理想方案是“软声明 + 硬兜底”:Ray 负责高效调度,底层平台(K8s/cgroups)负责安全隔离——这也是工业界的标准实践。

最终建议
不要试图在裸机上“完美配置 Ray 内存”,而是 把 Ray 放在一个有资源边界的环境中运行(如 Docker/K8s),让它专注做擅长的事——快速、灵活地执行分布式任务

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐