Ray 设计思想总结
Ray 是为动态、细粒度、高并发的 AI/ML 工作负载量身打造的运行时,其Object Store + 异步 Pull + 去中心化调度架构极具启发性。但在借鉴时,应聚焦其内存模型与调度哲学,而非简单套用序列化方式。同时,明确其边界——不是万能分布式框架,而是 AI 时代的“分布式胶水”。Ray 的异步 pull 模型是其面向动态、细粒度、高并发 AI 任务的核心设计之一。它通过“懒加载 + 按
设计背景
Ray 是一个为通用分布式计算而设计的开源框架,其设计背景主要源于以下几个方面的需求和挑战:
人工智能与机器学习工作负载的复杂性增长
随着强化学习(Reinforcement Learning)、超参数调优(Hyperparameter Tuning)、模型服务(Model Serving)等 AI 应用的发展,传统的大数据处理框架(如 Hadoop、Spark)在应对低延迟、高并发、异构任务调度等方面显得力不从心。这些新兴 AI 工作负载通常具有以下特点:
- 任务粒度细(毫秒级甚至微秒级)
- 动态任务生成(例如强化学习中的 rollout 会动态触发训练任务)
- 混合计算模式(CPU/GPU、同步/异步、批处理/流处理)
- 对低延迟和高吞吐有严格要求
现有系统无法满足灵活、高效的分布式执行需求
传统的分布式系统(如 Spark)基于批处理模型,任务调度开销大,难以支持细粒度、动态的任务图(task graph)。而高性能计算(HPC)或消息传递接口(MPI)虽然性能高,但编程模型复杂、容错能力弱、不易扩展到大规模异构集群。
需要统一的编程模型支持多种 AI 场景
研究人员和工程师希望有一个统一的框架,既能用于分布式训练、也能用于推理服务、自动调参、模拟仿真等场景,而无需为每种任务使用不同的系统。Ray 的设计目标之一就是提供一个“通用”的分布式运行时,通过简单的 Python API 实现复杂的分布式应用。
来自伯克利 RISELab 的研究驱动
Ray 最初由加州大学伯克利分校的 RISELab(前身为 AMPLab,也是 Spark 的诞生地)于 2017 年左右开始研发。RISELab 的研究重点是“实时智能与安全”,因此 Ray 被设计为支持低延迟、高吞吐、可扩展且容错的分布式执行引擎,特别适合构建实时 AI 系统。
对易用性和可扩展性的双重追求
Ray 强调“简单即强大”:用户只需通过 @ray.remote 装饰器即可将普通 Python 函数或类变为分布式可执行单元,底层自动处理任务调度、对象存储、故障恢复等复杂问题。同时,Ray 的架构(如基于共享内存的对象存储、去中心化的任务调度)使其能够线性扩展到数千节点。
Ray 的设计背景是为了解决现代 AI 和机器学习应用中对低延迟、高并发、动态任务调度、异构资源利用和易用性的综合需求,填补了传统大数据框架与高性能计算系统之间的空白,成为一个面向未来智能应用的通用分布式计算平台。
项目定位与目标
解决什么问题?
细粒度任务调度与低延迟执行
- 传统大数据框架(如 Spark)面向批处理,任务调度开销大(通常数百毫秒),难以支持毫秒级甚至微秒级的任务。
- Ray 支持细粒度、动态生成的任务图(task graph),适用于强化学习中的 rollout、超参搜索中的 trial 等场景。
动态与异构工作负载的支持不足
- AI 工作流常包含混合任务:CPU 计算、GPU 推理、模拟环境、模型训练、在线服务等。
- Ray 能统一调度异构资源(CPU/GPU/TPU)并支持动态任务依赖(例如一个任务的结果决定后续启动哪些任务)。
缺乏统一的编程模型
- 过去开发者需为不同场景(训练、调参、部署)使用不同工具(如 Spark + Kubernetes + 自定义调度器)。
- Ray 提供统一的运行时和 API,覆盖从实验到生产的全生命周期。
容错性与可扩展性之间的权衡
- 高性能系统(如 MPI)通常牺牲容错;容错系统(如 Hadoop)牺牲性能。
- Ray 在保证低延迟和高吞吐的同时提供任务级容错(自动重试失败任务)。
Python 生态的分布式能力缺失
- Python 是 AI 主流语言,但原生不支持高效分布式执行。
- Ray 以Python 为第一语言,通过极简 API(如
@ray.remote)实现透明分布式化。
目标用户是谁?
AI/ML 研究人员与工程师
- 需要快速实验强化学习、超参数优化、大规模模拟等场景。
- 希望用 Python 编写分布式代码而无需深入底层系统细节。
MLOps 与平台团队 - 构建统一的 AI 平台,整合训练、调参、推理、监控等环节。
- 利用 Ray 的可扩展性和资源隔离能力部署生产级服务。
数据科学家与应用开发者
- 希望将单机脚本轻松扩展到多机集群,提升计算效率。
- 使用 Ray Data、Ray Serve 等高级库进行数据处理与模型部署。
企业与云服务商
- 如 Amazon(SageMaker)、Google(Vertex AI)、Microsoft(Azure ML)等已集成 Ray。
- 用于构建托管 AI 服务,降低客户使用分布式 AI 的门槛。
核心设计理念是什么?
Task-Based Execution Model(基于任务的执行模型)
- 将计算抽象为无状态的任务(tasks) 和有状态的 Actor(actors)。
- 任务是函数调用,Actor 是可跨节点共享的状态对象(如模型服务器、环境模拟器)。
Shared-Memory Object Store(共享内存对象存储)
- 使用 Apache Arrow 格式在节点内通过零拷贝共享内存传递数据,极大减少序列化/反序列化开销。
- 跨节点通过高效 RPC 传输,支持大对象缓存与复用。
Decentralized, Scalable Scheduling(去中心化调度)
- 采用两级调度架构:全局调度器(Global Scheduler) + 本地调度器(Local Scheduler)。
- 支持线性扩展至数千节点,调度延迟低(<1ms)。
Language-First, Developer-Centric(以语言和开发者为中心)
- 不要求用户学习新 DSL 或配置复杂集群。
- 通过装饰器(
@ray.remote)将普通 Python 函数/类“一键分布式化”。
Unified Runtime for AI Workflows(AI 工作流的统一运行时)
- 构建上层库(如 Ray Tune、Ray RLlib、Ray Serve)共享同一底层运行时,避免系统碎片化。
整体架构概览
主要组件有哪些?它们如何协作?
Ray 的架构设计围绕“高性能、易用性、可扩展性”三大目标,其系统由多个协同工作的核心组件构成。这些组件共同提供了一个统一的分布式运行时,支持从细粒度任务到有状态服务的各类 AI 工作负载。以下是 Ray 的主要组件及其协作方式:
Driver(驱动程序)
- 作用:用户编写的 Python 程序入口(即主进程),负责提交任务、创建 Actor、获取结果。
- 特点:
- 每个 Ray 应用启动时会有一个 Driver。
- 不执行计算任务,只负责协调和调度请求。
- 协作:向全局控制层(GCS)注册自身,并通过本地调度器提交任务。
Worker(工作进程)
- 作用:实际执行任务(tasks)或托管 Actor 的进程。
- 类型:
- 无状态 Worker:执行普通函数任务(
@ray.remote函数)。 - Actor Worker:运行有状态的 Actor 对象(
@ray.remote类实例)。
- 无状态 Worker:执行普通函数任务(
- 特点:
- 每个节点可启动多个 Worker。
- 自动从对象存储读取输入、将输出写回对象存储。
- 协作:从本地调度器领取任务,执行后将结果存入对象存储,并通知调度器任务完成。
Object Store(对象存储)
- 作用:高效存储和共享任务输入/输出数据(称为“对象”,Objects)。
- 关键技术:
- 基于 Apache Arrow 内存格式。
- 节点内使用 共享内存(Plasma),实现零拷贝数据访问。
- 跨节点通过 gRPC + 自定义传输协议 复制对象。
- 协作:
- Worker 执行任务前从 Object Store 读取依赖对象。
- 任务结果直接写入本地 Object Store。
- 若其他节点需要该对象,自动触发跨节点拉取。
优势:避免重复序列化,大幅提升数据密集型任务性能。
Scheduler(调度器)
Ray 采用两级调度架构:
a) Global Scheduler(全局调度器)
- 作用:决定任务应分配到哪个节点(基于资源可用性、位置亲和性等)。
- 部署:通常与 Head Node 上的 GCS 共置。
b) Local Scheduler(本地调度器,现称 Core Worker Scheduler)
-
作用:在本节点内将任务分发给空闲 Worker。
-
特点:轻量、低延迟(微秒级)。
-
协作流程:
- Driver 提交任务 → Local Scheduler(若本地资源不足)→ 转发给 Global Scheduler。
- Global Scheduler 选择目标节点 → 将任务路由到该节点的 Local Scheduler。
- Local Scheduler 分配给 Worker 执行。
支持动态资源感知(CPU/GPU/自定义资源)和负载均衡。
Global Control Store (GCS)
- 作用:Ray 的“元数据中心”,存储集群的全局状态。
- 管理内容:
- 节点注册与心跳(Node Manager 注册信息)
- Actor 生命周期(位置、状态)
- 任务元数据(DAG 依赖、状态)
- 对象位置映射(哪些节点持有某对象)
- 实现:早期基于 Redis,新版本(Ray 2.0+)逐步迁移到更高效的内部存储(如 GCS Server + Raft 共识)。
- 协作:
- 所有节点(Head + Workers)定期向 GCS 报告状态。
- 调度器查询 GCS 获取集群视图。
- 容错时用于重建失败 Actor 或任务。
Raylet(节点管理器)
- 作用:每个非 Head 节点上的本地守护进程,整合了 Local Scheduler 和 Object Store 的控制逻辑。
- 功能:
- 管理本节点的资源(CPU/GPU/内存)
- 启动/监控 Worker 进程
- 与 GCS 通信汇报状态
- 协调本地任务调度与对象存储
- 协作:
- 向 GCS 注册节点能力。
- 接收来自其他节点的任务请求。
- 触发对象跨节点传输。
Raylet 是 Ray 实现去中心化、可扩展性的关键组件。
Core Worker(核心工作库)
- 作用:嵌入在每个 Python/Java/C++ 进程中的客户端库,提供 Ray API 的底层实现。
- 功能:
- 管理任务提交、对象引用(ObjectRef)
- 与本地 Raylet 通信
- 执行任务或 Actor 方法
- 协作:
- Driver 和 Worker 都包含 Core Worker。
- 将用户代码(如
f.remote())转化为内部任务描述并提交。
画一张简单的架构图(可用文字描述)
全局概览图
关键设计与实现机制
选择 2–3 个最具代表性的设计点深入分析
🔹 设计点 1:GCS —— 全局控制存储(Global Control Store)
-
问题背景
在大规模分布式集群中,如何实现:- 节点动态加入/退出的自动发现?
- Actor 和任务结果对象的全局定位?
- 调度器获取一致的集群资源视图(CPU/GPU/自定义资源)?
-
解决方案
GCS 作为集群的统一元数据中心,提供以下核心服务:- 注册表:记录所有活跃节点、Actor 实例、对象(Object)的位置信息;
- 资源目录:维护每个节点的资源总量与已分配量;
- 状态同步:支持任务/Actor 的容错重建(通过元数据恢复执行上下文)。
-
关键技术演进
- Ray < 2.0:基于 Redis 作为后端存储,GCS 逻辑运行在 Head Node;
- Ray ≥ 2.0:引入 原生 GCS Server,支持 Raft 共识协议(为未来高可用做准备),逐步减少对 Redis 的依赖;
- 通信协议:通过 gRPC 与各节点 Raylet 同步心跳和状态。
-
优点
- 提供强一致的全局视图,支撑智能调度与容错;
- 解耦控制平面与数据平面(GCS 不传数据,只管“位置”);
- 支持动态扩缩容和异构资源管理。
-
缺点 / 挑战
- 当前开源版 GCS 仍为单点(Head Node 上),存在单点故障风险;
- 元数据规模随集群增长而膨胀,可能成为性能瓶颈;
- Raft HA 模式尚未在社区版默认启用,生产环境需额外保障。
🔹 设计点 2:Raylet —— 节点级自治运行时
-
问题背景
如何在保持全局协调的同时,实现低延迟、去中心化的任务执行?如何让每个节点独立管理资源、调度任务、处理故障? -
解决方案
每个节点部署一个 Raylet 守护进程,作为本地“微型操作系统”,集成三大核心能力:- 本地调度器(Local Scheduler):接收任务并分配给空闲 Worker;
- 节点资源管理器:跟踪 CPU/GPU/内存使用,上报 GCS;
- 对象存储协调器:管理本地 Object Store 的读写与跨节点拉取。
-
关键技术
- 两级调度架构:本地快速调度 + 全局后备协调;
- 资源抽象模型:支持自定义资源(如
{"GPU": 1, "special_hw": 1}); - 与 Core Worker 的 IPC 通信:通过 Unix Domain Socket 或 TCP 高效交互。
-
优点
- 低调度延迟(微秒级),适合细粒度任务;
- 去中心化:即使 GCS 短暂不可用,本地任务仍可执行;
- 资源隔离:每个节点独立管理,避免全局锁竞争。
-
缺点
- 每个节点需常驻 Raylet 进程,带来固定资源开销(约 100–300MB 内存);
- 调度策略相对简单(主要基于资源可用性),不如 Kubernetes 灵活;
- 跨节点任务依赖仍需 GCS 协调,无法完全脱离中心控制。
🔹 设计点 3:Object Store —— 高性能分布式对象存储
-
问题背景
AI 工作负载常涉及大量中间数据(如 rollout 样本、梯度、模型参数),如何:- 避免重复序列化/反序列化?
- 实现同节点多任务零拷贝共享?
- 高效支持跨节点数据拉取?
-
解决方案
每个节点部署一个 基于共享内存的对象存储,特点包括:- 使用 Apache Arrow 内存格式 存储不可变对象;
- 同节点内通过 共享内存(Shared Memory) 实现零拷贝访问;
- 跨节点通过 异步 pull 模型 拉取对象(由请求方主动触发)。
-
关键技术
- Plasma(早期)→ Arrow Shared Memory(当前):统一内存布局;
- 引用计数 + 自动驱逐(Spilling):内存不足时 spill 到磁盘;
- 对象位置缓存:Core Worker 本地缓存对象位置,减少 GCS 查询;
- 批量传输优化:大对象分块传输,避免网络阻塞。
-
优点
- 极大降低数据传输开销,尤其适合迭代式 AI 计算;
- 天然支持数据复用(多个任务消费同一对象无需复制);
- 与 Python 对象无缝集成(通过
ray.put()/ray.get())。
-
缺点
- 内存敏感:大对象或高并发易导致 OOM,需合理配置
object_store_memory; - 跨节点传输仍依赖网络带宽,大模型场景可能成为瓶颈;
- 不支持对象更新(Immutable Design),不适合频繁修改的场景。
- 内存敏感:大对象或高并发易导致 OOM,需合理配置
这三者共同构成了 Ray “去中心化执行 + 中心化协调 + 高效数据共享” 的独特架构,特别适合现代 AI 工作负载
我的收获与启发
把“学到的东西”转化为“我能用的东西”
| 启发 | 我可以怎么应用到实际工作中? |
|---|---|
| 基于共享内存的对象存储 + 零拷贝访问 | 在构建高性能 Python 数据处理服务时,可借鉴 Ray 的思路: • 使用 Apache Arrow(而非 pickle)作为内存数据格式,实现跨进程/线程零拷贝共享; • 对高频复用的中间结果(如特征向量、预处理数据),缓存在共享内存中,避免重复序列化开销; • 若无法引入 Ray,可用 multiprocessing.shared_memory + Arrow 手动实现轻量级对象缓存。 |
| 异步 Pull 模型 + 按需加载 | 在微服务或分布式任务系统中: • 设计数据依赖时采用“懒加载”策略,仅在消费者真正需要时才触发数据传输; • 通过全局元数据服务(如 etcd/Redis)记录数据位置,解耦生产者与消费者; • 避免提前广播大数据,节省带宽。 |
| 去中心化调度 + 节点自治(Raylet) | 在边缘计算或混合云场景: • 让每个节点具备本地调度能力,减少对中心控制平面的依赖; • 本地优先执行任务,仅在资源不足时请求全局协调,提升响应速度与容错性。 |
延伸思考(可选)
- 如果让你改进 Ray,你会做什么?
-
智能网络感知调度
当前 Ray 调度器主要基于 CPU/GPU 资源,忽略网络拓扑。可扩展为:
• 自动探测节点间带宽/延迟(如通过心跳包测速);
• 对于大对象依赖的任务,优先调度到同机架/同可用区的节点;
• 对网络较差的节点(如跨云、边缘设备),自动限制其接收大对象任务,或启用压缩传输。 -
增强 GCS 高可用与持久化
开源版 GCS 仍是单点。可:
• 默认启用 Raft 多副本模式;
• 支持将元数据快照持久化到 S3 或数据库,实现 Head 节点完全故障恢复。 -
对象存储的分层存储(Tiered Storage)
当前 Spilling 仅支持内存 → 磁盘。可扩展为:
• 热数据:内存(Object Store)
• 温数据:本地 SSD
• 冷数据:远程对象存储(如 S3)
并自动根据访问频率迁移,支撑超大规模中间数据。 -
失败任务的数据血缘追踪
当 Worker 崩溃导致对象丢失,Ray 会重试任务,但不记录数据血缘。可集成类似 LineageStore 的机制,快速重建丢失对象,而非从头执行整个 DAG。
- Ray 不适合什么场景?
| 场景 | 原因 | 替代方案建议 |
|---|---|---|
| 强一致性事务处理 | Ray 的对象是不可变的,无 ACID 保证,不适合银行转账等场景 | 使用传统数据库(PostgreSQL, Spanner) |
| 超低延迟实时流处理(<1ms) | 异步 pull 模型有调度+网络延迟,无法满足硬实时要求 | 使用 Flink、Kafka Streams 或专用 DSP |
| 长时间运行的有状态服务(非 Actor 模式) | 虽然 Actor 可长期运行,但 Ray 本身不是为通用 Web 服务设计 | 使用 FastAPI + Kubernetes,或 Ray Serve(仅限模型推理) |
| 极小规模单机任务 | Ray 启动开销(约 1–2 秒)和内存占用(Head + Raylet)过高 | 直接用 Python 多进程/线程或 asyncio |
| 需要精细网络 QoS 控制的环境 | Ray 不提供带宽限速、优先级队列等网络控制 | 在 Kubernetes 层面通过 CNI 插件实现 |
总结
Ray 是为 动态、细粒度、高并发的 AI/ML 工作负载量身打造的运行时,其 Object Store + 异步 Pull + 去中心化调度 架构极具启发性。但在借鉴时,应聚焦其内存模型与调度哲学,而非简单套用序列化方式。同时,明确其边界——不是万能分布式框架,而是 AI 时代的“分布式胶水”。
参考资料
- 官方文档链接
- GitHub 仓库
- 推荐阅读文章或视频
对象传输和发现解析
为什么需要“异步 pull 模型”?—— 设计动机
在分布式系统中,任务 A 在节点 X 产生一个对象 O,任务 B 在节点 Y 需要消费 O。如何把 O 从 X 传到 Y?
传统方案(如 Spark)常采用 push 模式:生产者(X)主动将数据推送给所有消费者(Y)。
但 Ray 面向的是 动态、细粒度、高并发的 AI 任务,具有以下特点:
- 消费者可能尚未创建(例如强化学习中 rollout 动态触发训练任务);
- 一个对象可能被多个未知消费者使用;
- 网络带宽宝贵,应避免提前推送未被请求的数据。
因此,Ray 选择 pull 模型:只有当消费者明确需要对象时,才触发拉取。
核心思想:“按需拉取,懒加载”(Lazy Fetching)
异步 Pull 模型的工作原理
1. 对象标识与位置注册
- 每个对象由全局唯一的 Object ID 标识(如
ffffffffffffffffffffffff010000c801000000)。 - 当 Worker 在节点 X 的 Object Store 中创建对象 O 后:
- Object Store 通知本地 Raylet;
- Raylet 将 (Object ID → Node X) 的映射注册到 GCS。
2. 消费者发起请求
- 当另一个节点 Y 上的任务调用
ray.get(object_ref)时:- 节点 Y 的 Core Worker 查询本地缓存是否已有该对象;
- 若无,则向本地 Raylet 发起“获取对象 O”的请求。
3. Raylet 触发异步拉取
- 节点 Y 的 Raylet 向 GCS 查询 “Object O 存在于哪些节点?”;
- GCS 返回一个或多个持有该对象的节点列表(如 [Node X, Node Z]);
- Raylet 选择一个源节点(通常基于网络拓扑、负载等策略),并向其 Raylet 发送 Pull Request。
4. 异步数据传输
- 源节点(X)的 Raylet 从本地 Object Store 读取对象 O;
- 通过 gRPC 流式传输 将 O 发送给目标节点 Y;
- 目标节点 Y 的 Raylet 接收数据后,写入本地 Object Store;
- 整个过程不阻塞消费者线程:
ray.get()在后台等待,直到对象就绪。
5. 通知与就绪
- 对象写入 Y 的 Object Store 后,触发 就绪通知;
- Core Worker 被唤醒,
ray.get()返回数据; - 同时,Y 的 Raylet 向 GCS 注册:“我也持有对象 O”,供后续消费者复用。
关键:拉取是异步的、非阻塞的、按需触发的
关键技术机制
对象位置缓存(Location Caching)
- 每个节点的 Raylet 会缓存常用对象的位置信息,减少对 GCS 的查询压力。
- 缓存通过 GCS 的 订阅机制(Pub/Sub) 自动更新(如对象被驱逐时通知)。
多副本支持(Multiple Locations)
- 一个对象可存在于多个节点(如广播场景);
- Pull 时可从最近或最空闲的副本拉取,提升吞吐。
批量与流式传输
- 大对象被切分为 chunks,通过 gRPC 流式发送,避免内存峰值;
- 支持 并行拉取多个对象,提高带宽利用率。
背压与流控(Backpressure)
- 若接收方 Object Store 内存不足,会暂停拉取,防止 OOM;
- 结合 Spilling(溢出到磁盘) 机制缓解内存压力。
引用计数与生命周期
- 对象在 Object Store 中通过 引用计数 管理生命周期;
- 当所有引用(包括远程拉取中的)释放后,对象被自动回收。
一个完整流程示例
# Node A
@ray.remote
def produce():
return np.random.rand(1000, 1000) # 大数组
# Node B
@ray.remote
def consume(data):
return data.sum()
# Driver (on Node C)
obj_ref = produce.remote() # 任务在 Node A 执行,对象存于 A
result = consume.remote(obj_ref) # 任务调度到 Node B,B 需要 obj_ref
执行流程:
produce在 Node A 完成,对象 O 存入 A 的 Object Store;- A 的 Raylet 向 GCS 注册:O ∈ {A};
consume被调度到 Node B;- B 的 Worker 执行
consume,发现需要 O; - B 的 Raylet 查询 GCS → 得知 O 在 A;
- B 的 Raylet 向 A 的 Raylet 发起 Pull Request;
- A 通过 gRPC 将 O 异步发送给 B;
- B 将 O 存入本地 Object Store;
consume读取 O 并计算,返回结果。
整个过程中,Driver 和 Worker 均无需关心“数据在哪”、“怎么传”,完全透明。
优势 vs 局限
| 优势 | 局限 / 挑战 |
|---|---|
| ✅ 按需传输:避免无效网络开销 | ❌ 首次访问延迟:pull 引入额外 RTT |
| ✅ 解耦生产者与消费者:生产者无需知道谁会消费 | ❌ GCS 成为元数据瓶颈:高频查询可能压力大 |
| ✅ 天然支持多消费者:每个消费者独立拉取 | ❌ 大对象拉取可能阻塞网络(需配合流控) |
| ✅ 易于扩展:新增节点自动参与 pull 网络 | ❌ 不适用于实时流场景(pull 是请求驱动,非 push 实时) |
与其他系统的对比
| 系统 | 数据传输模型 | 适用场景 |
|---|---|---|
| Ray | 异步 Pull | 细粒度任务、动态 DAG、AI 工作流 |
| Spark | Push(Shuffle) | 批处理、静态 DAG |
| Dask | Pull + Caching | 中等粒度任务,Python 生态 |
| MPI | 显式 Send/Recv | HPC,程序员手动管理通信 |
总结
Ray 的 异步 pull 模型 是其面向动态、细粒度、高并发 AI 任务的核心设计之一。它通过 “懒加载 + 按需拉取 + 共享内存 + 异步传输” 的组合,在保证易用性的同时,实现了高性能和资源效率的平衡。虽然在首次访问时有轻微延迟,但在典型 AI 工作负载(如多次迭代、数据复用)中,其优势远大于代价。
逻辑资源隔离和多资源协同调度
逻辑隔离:不是物理隔离,而是“资源记账 + 调度约束”
✅ 什么是“逻辑隔离”?
Ray 不直接控制硬件,而是通过以下方式实现“软隔离”:
- 资源记账(Resource Accounting):每个节点维护一个资源池(如
{"CPU": 32.0, "GPU": 8}); - 调度准入控制(Admission Control):任务只有在资源足够时才被调度;
- 运行时无强制限制:任务一旦启动,可使用任意 CPU/GPU(依赖 OS/框架隔离)。
📌 关键:逻辑隔离 = “你只能申请这么多,但用了多少我们不管”。
多资源协同:统一资源模型支持任意组合
Ray 将所有资源视为 键值对(key-value),支持 CPU + GPU + 自定义资源 的任意组合。
1. 资源类型
| 类型 | 示例 | 特性 |
|---|---|---|
| 内置资源 | "CPU", "GPU" |
自动探测,可分数(CPU)或整数(GPU) |
| 自定义资源 | "TPU", "FPGA", "license" |
用户定义,整数单位 |
2. 任务/Actor 声明多资源
@ray.remote(
num_cpus=4, # 4 个逻辑 CPU
num_gpus=2, # 2 张 GPU
resources={"special_hw": 1} # 1 个自定义硬件
)
def distributed_train():
...
✅ 协同含义:该任务必须同时满足这三类资源才可调度。
调度机制:如何实现多资源协同分配?
Ray 采用 两级调度 + 全局资源视图 实现多资源协同:
步骤 1:任务提交
- Driver 提交任务,携带资源需求
{CPU:4, GPU:2, special_hw:1}。
步骤 2:本地调度尝试
- 本地 Raylet 检查本机剩余资源:
available = {"CPU": 8.0, "GPU": 1, "special_hw": 1} - 因
GPU:1 < 2,本地拒绝。
步骤 3:全局调度介入
- 本地 Raylet 向 Head Node 的 Global Scheduler 请求;
- Global Scheduler 查询 GCS 获取全集群资源快照:
Node A: {"CPU": 16, "GPU": 4, "special_hw": 1} Node B: {"CPU": 32, "GPU": 8, "special_hw": 0} - 匹配规则:找一个节点满足 所有资源 ≥ 请求;
- 选择 Node A(唯一满足
special_hw:1的节点)。
步骤 4:资源预留与执行
- GCS 标记 Node A 的资源为“已分配”:
Node A allocated: {"CPU":4, "GPU":2, "special_hw":1} - 任务被路由到 Node A 的 Raylet;
- Raylet 启动 Worker,设置环境变量(如
CUDA_VISIBLE_DEVICES=0,1); - 任务开始执行。
🔁 释放:任务完成后,资源自动归还。
典型案例:需要 10 个 GPU 的分布式训练
假设你要运行一个 需要 10 张 GPU 的 PyTorch DDP 训练任务,而单机最多 8 GPU。
方案 1:单 Actor 跨节点(不推荐)
- ❌ Ray 不支持单个任务跨节点分配 GPU!
@ray.remote(num_gpus=10)会因无单机满足而永远 pending。
✅ 方案 2:多 Actor 协同(Ray 的标准做法)
设计思路:
- 将 10 GPU 任务拆分为 10 个 Worker Actor,每个占 1 GPU;
- 通过 Ray 的 Actor 引用 + NCCL/GLOO 通信 实现分布式训练。
代码示例:
@ray.remote(num_gpus=1)
class Trainer:
def __init__(self, rank):
self.rank = rank
# 初始化 PyTorch DDP
os.environ["MASTER_ADDR"] = ray.get(ray.nodes()[0]["NodeManagerAddress"])
os.environ["MASTER_PORT"] = "12345"
dist.init_process_group("nccl", rank=rank, world_size=10)
def train(self, data_shard):
# 执行分布式训练 step
return loss
# 启动 10 个 Trainer(自动跨节点调度)
trainers = [Trainer.remote(rank=i) for i in range(10)]
# 并行执行
losses = ray.get([t.train.remote(shard) for t, shard in zip(trainers, data_shards)])
调度过程:
- Ray 遍历集群,找到 至少 10 张 GPU 的总和(如 2 节点 × 8 GPU = 16 > 10);
- 将 10 个
Trainer分配到不同节点(如 Node A: 6 个, Node B: 4 个); - 每个
Trainer独占 1 GPU,通过CUDA_VISIBLE_DEVICES隔离; - 应用层(PyTorch)负责跨节点通信(需确保网络互通)。
✅ 优势:
- 充分利用集群 GPU 资源;
- 自动处理节点异构(如部分节点有 A100,部分有 V100);
- 容错:若一个 Trainer 失败,可重建并恢复训练。
高级协同:资源亲和性与放置组(Placement Groups)
对于 强耦合任务(如 AllReduce 需要低延迟网络),Ray 提供 Placement Group 实现资源捆绑分配。
示例:10 GPU + 低延迟要求
# 创建一个 placement group,要求 10 个 GPU 在同一节点(或同一 rack)
pg = ray.util.placement_group(
bundles=[{"GPU": 1}] * 10, # 10 个 bundle,每个 1 GPU
strategy="STRICT_PACK" # 必须在同一节点
)
# 等待资源就绪
ray.get(pg.ready())
# 在 placement group 中启动 Actors
trainers = [
Trainer.options(placement_group=pg).remote(rank=i)
for i in range(10)
]
策略说明:
| 策略 | 行为 | 适用场景 |
|---|---|---|
STRICT_PACK |
所有 bundle 必须在同一节点 | 单机多卡训练 |
PACK |
尽量打包,允许跨节点 | 默认行为 |
SPREAD |
尽量分散到不同节点 | 容错优先 |
STRICT_SPREAD |
必须分散到不同节点 | 高可用服务 |
Placement Group 是 Ray 实现“多资源协同+拓扑感知”的关键抽象。
容错与弹性扩展
- 任务失败:Ray 自动重试(可配置),重建 Actor 并重新分配资源;
- 节点故障:GCS 检测心跳丢失,标记资源为不可用,后续任务避开该节点;
- 动态扩缩容:新节点加入后,自动注册资源,Global Scheduler 立即可用。
总结:Ray 如何做到逻辑隔离与多资源协同?
| 能力 | 实现机制 | 用户价值 |
|---|---|---|
| 逻辑隔离 | 资源记账 + 调度准入控制 | 防止过载,保证任务可调度性 |
| 多资源协同 | 统一资源模型 + 全局调度 | 支持 CPU/GPU/自定义资源任意组合 |
| 跨节点 GPU 利用 | 多 Actor 模式 + Placement Group | 突破单机 GPU 限制 |
| 拓扑感知 | Placement Group 策略 | 优化通信性能(同节点/同 rack) |
| 弹性容错 | GCS 状态管理 + 自动重试 | 保障长时间训练任务可靠性 |
✨ 核心思想:
“把复杂留给自己,把简单留给用户” —— 用户只需声明“我要什么资源”,Ray 负责在异构集群中找到最优位置,并协调它们一起工作。
如果你正在设计一个需要 10+ GPU 的训练系统,不要试图用单个任务请求 10 GPU,而是:
- 拆分为多个 Actor;
- 用 Placement Group 控制布局;
- 在应用层实现通信(如 PyTorch DDP / Horovod)。
Actor 模型 + 标准通信库(NCCL/GLOO)
核心思想:分工协作
| 组件 | 职责 |
|---|---|
| Ray | • 启动并管理多个 GPU Worker(Actor) • 跨节点调度 • 提供 Actor 引用(用于协调) • 容错与自动重启 |
| NCCL / GLOO | • 提供高性能 GPU/CPU 间 collective 通信(AllReduce, Broadcast 等) • 管理底层网络传输(InfiniBand/RoCE/TCP) |
✅ Ray 不 reinvent the wheel,而是复用 PyTorch/TensorFlow 已有的成熟通信后端。
实现原理详解
步骤 1:启动多个 Trainer Actor(每个占 1 GPU)
@ray.remote(num_gpus=1)
class Trainer:
def __init__(self, rank, world_size, master_addr, master_port):
self.rank = rank
# 设置环境变量(PyTorch DDP 要求)
os.environ["MASTER_ADDR"] = master_addr
os.environ["MASTER_PORT"] = str(master_port)
# 初始化进程组
dist.init_process_group(
backend="nccl", # 或 "gloo"(CPU)
rank=rank,
world_size=world_size
)
self.model = ... # 加载模型
- 每个
Trainer是一个独立的 Python 进程; - Ray 通过
num_gpus=1确保每个进程独占一张 GPU; CUDA_VISIBLE_DEVICES由 Ray 自动设置,保证进程只看到分配的 GPU。
步骤 2:建立通信拓扑(关键!)
PyTorch DDP 需要一个 “主节点”(Master) 来协调所有 Worker 的连接。在 Ray 中,有几种方式:
✅ 推荐方式:使用 Head Node IP 作为 MASTER_ADDR
# 获取 Head Node 的内部 IP(所有节点可访问)
head_ip = ray.nodes()[0]["NodeManagerAddress"]
trainers = [
Trainer.remote(rank=i, world_size=10, master_addr=head_ip, master_port=12345)
for i in range(10)
]
- 所有
Trainer进程连接到head_ip:12345; - 第一个连接的进程(通常是 rank=0)成为 store server(由 PyTorch 内部启动);
- 其他进程从该 server 获取彼此的地址,建立 P2P 连接。
🔍 底层:PyTorch 使用
c10d::TCPStore或FileStore实现 rendezvous。
步骤 3:执行分布式训练 Step
def train_step(self, batch):
outputs = self.model(batch)
loss = compute_loss(outputs)
loss.backward()
# 关键:AllReduce 梯度(跨所有 GPU)
for param in self.model.parameters():
dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)
param.grad /= self.world_size # 平均梯度
optimizer.step()
dist.all_reduce()触发 NCCL AllReduce(GPU 显存 → GPU 显存);- NCCL 自动选择最优算法(Ring, Tree, etc.)和网络路径;
- 整个过程不经过 CPU 或 Ray,纯 GPU 通信。
步骤 4:结果聚合(可选)
训练结束后,可通过 Ray 的 ray.get() 收集各 Actor 的结果:
losses = ray.get([t.train_step.remote(batch) for t in trainers])
avg_loss = sum(losses) / len(losses)
⚠️ 注意:通信(AllReduce)发生在训练内部,不是通过 Ray 传输数据!
容错机制
- 如果某个
Trainer崩溃:- Ray 自动重启该 Actor(需用户代码支持 checkpoint);
- 但 PyTorch 进程组已失效,需重建整个训练作业(目前 Ray 不支持 DDP 级别细粒度恢复);
- 生产建议:
- 定期保存 checkpoint 到共享存储(如 S3);
- 使用
ray.train(Ray Train)框架,内置容错支持。
训练过程流程图(Mermaid)
以下是一张 任务驱动的分布式训练流程图,展示从启动到通信的全过程:
图中关键说明:
- Driver 获取 Head Node IP 作为通信协调点;
- 10 个 Trainer Actor 被 Ray 调度到不同节点(可能跨机器);
- 每个 Trainer 设置相同的
MASTER_ADDR/PORT,调用dist.init_process_group; - PyTorch 内部通过 TCPStore 完成 rendezvous(地址交换);
- NCCL/GLOO 建立全连接通信拓扑(All-to-All);
- 训练过程中,梯度同步通过 NCCL AllReduce 直接在 GPU 间完成;
- 最终指标通过 Ray 的
ray.get()汇总(小数据量,不影响性能)。
NCCL vs GLOO 选择
| 后端 | 适用场景 | 特点 |
|---|---|---|
| NCCL | GPU 训练 | • 高性能(专为 NVIDIA GPU 优化) • 支持 InfiniBand/RoCE • 仅支持 GPU 张量 |
| GLOO | CPU 训练 / 小集群 | • 支持 CPU 张量 • 基于 TCP,兼容性好 • 性能低于 NCCL |
✅ 在 GPU 集群上,始终优先使用 NCCL。
总结
- Ray 的角色:提供 Actor 生命周期管理 + 资源调度 + 跨节点部署;
- NCCL/GLOO 的角色:提供 高性能 collective 通信;
- 协同方式:Ray 负责“把人叫齐并分配座位”,NCCL 负责“让他们高效对话”;
- 数据流分离:
- 控制流:通过 Ray(启动、协调、收集结果);
- 数据流:通过 NCCL(梯度同步,绕过 Ray)。
这种设计既利用了 Ray 的易用性和弹性,又保留了底层通信库的极致性能,是构建大规模 AI 系统的理想架构。
建议:
对于生产级训练,推荐使用 Ray Train(ray.train.torch.TorchTrainer),它封装了上述细节,并提供自动容错、混合精度、日志集成等能力。
Ray的OOM问题和背景
因为在实际过程中确实非常容易出现OOM的问题, 而且在程序运行之前其实很难预料要使用多少内存, 很多只是基于“经验值”想要“安全的”跑任务就要提高经验值的阈值, 容易造成内存的浪费?
这个问题非常真实,也确实是 Ray 在实际生产中面临的核心挑战之一:
“内存需求难以预估 → 保守估计导致资源浪费,激进估计导致 OOM”。
这本质上反映了 分布式系统中“资源弹性”与“资源安全”之间的根本矛盾。我们来深入分析这个问题的根源、现状以及可能的缓解路径。
为什么内存难以准确预估?
-
任务行为动态性强
- 例如 RL 中的 rollout 长度可变、LLM 推理 batch size 动态调整、数据预处理中样本大小不一。
- Python 对象内存占用本身就不透明(如 list 扩容、pandas DataFrame 内存碎片)。
-
依赖外部输入
- 任务内存消耗取决于输入数据大小,而输入往往在运行时才知道。
-
框架开销不可控
- Ray 的序列化(pickle/cloudpickle)、对象存储复制、中间缓存等都会额外占用内存,且难以建模。
-
缺乏运行时反馈机制
- 当前 Ray 的
memory=是静态声明,无法根据实际使用动态调整或拒绝超限任务。
- 当前 Ray 的
当前设计的两难困境
| 策略 | 后果 |
|---|---|
| 按经验值设高阈值(如声明 2GB,实际用 500MB) | ✅ 避免 OOM ❌ 资源利用率低(调度器认为用了 2GB,实际只用 0.5GB → 并行度下降) |
| 按理论值设低阈值(如声明 500MB,实际峰值 1.8GB) | ✅ 提高并行度 ❌ 极易 OOM,节点崩溃 |
这就是你所说的:“为了安全跑任务,不得不浪费内存”。
Ray 社区是否意识到这个问题?——是的,且正在改进
已落地的改进:
ray memory命令
可查看每个 ObjectRef 的内存占用,帮助事后分析。- 自动 Spill to Disk(对象存储溢出)
当 shared memory 满时,自动写入磁盘(需配置),避免 OOM(但性能下降)。 - Placement Groups + 弹性调度
可预留资源池,隔离关键任务。
正在推进的方向(GitHub / RFC):
- 运行时内存监控 + 任务驱逐:当 worker 内存超限时,自动取消任务(类似 Kubernetes OOMKiller)。
- 动态资源请求:允许任务在运行中“申请更多内存”(尚未支持)。
- 基于历史的自适应资源估算:类似 Spark 的 Adaptive Query Execution,根据前几次运行自动调整资源请求。
实用建议:如何在现有框架下平衡安全与效率?
1. 采用“分层资源策略”
- 关键任务:保守估计 + Placement Group 预留
- 弹性任务:激进估计 + 设置超时/重试 + 监控 OOM
2. 用容器/K8s 提供硬隔离
# KubeRay Pod spec 示例
resources:
requests:
memory: "1Gi" # Ray 调度用(软)
limits:
memory: "2Gi" # K8s 强制限制(硬)
这样即使 Ray 任务超用,K8s 会 kill 容器而非整个节点。
主动探测内存使用
在任务开始时采样内存:
import psutil
@ray.remote
def my_task(data):
start_mem = psutil.Process().memory_info().rss
# ... do work ...
end_mem = psutil.Process().memory_info().rss
print(f"Used {(end_mem - start_mem) / 1e6:.1f} MB")
积累几次后,用 P95 作为 memory= 的参考值。
限制单个节点的任务并发数
通过 num_cpus=1 或自定义资源(如 @ray.remote(resources={"task_slot": 1}))控制密度,避免“雪崩式 OOM”。
启用对象存储溢出(Spilling)
ray start --object-store-memory=4000000000 \
--object-spilling-config='{"type":"filesystem","params":{"directory_path":"/tmp/spill"}}'
即使内存超用,也能 fallback 到磁盘,保住节点不崩溃(牺牲性能换可用性)。
更深层思考:这是 Ray 的问题,还是分布式系统的通病?
其实,几乎所有通用分布式框架都面临类似问题:
| 框架 | 资源模型 | 是否硬隔离 | 内存预估难度 |
|---|---|---|---|
| Spark | spark.executor.memory |
否(依赖 YARN/K8s) | 高(需调优) |
| Dask | memory_limit(per worker) |
否 | 高 |
| Flink | taskmanager.memory |
否(需配 JVM) | 中 |
| Ray | memory=(per task) |
否 | 极高(因任务粒度细) |
💡 任务越细粒度,资源预估越难。Ray 的“微任务”模型放大了这一问题。
总结:不是设计失败,而是领域难题
- Ray 的“约定式资源”不是 bug,而是为性能和灵活性做出的 conscious trade-off。
- OOM 风险确实存在,且对新手不友好,但这可以通过部署架构(如容器化)、监控工具和经验积累来缓解。
- 理想方案是“软声明 + 硬兜底”:Ray 负责高效调度,底层平台(K8s/cgroups)负责安全隔离——这也是工业界的标准实践。
最终建议:
不要试图在裸机上“完美配置 Ray 内存”,而是 把 Ray 放在一个有资源边界的环境中运行(如 Docker/K8s),让它专注做擅长的事——快速、灵活地执行分布式任务。
更多推荐



所有评论(0)