cann组织链接:https://atomgit.com/cann
pypto仓库链接:https://atomgit.com/cann/pypto

前言

在大模型训练与推理日益依赖多设备协同的背景下,如何高效地将计算任务分配到不同硬件单元,并管理它们之间的数据流动,已成为 AI 框架的核心挑战。CANN 生态中的 PyPTO(Parallel Tensor/Tile Operation)项目,作为一套面向 Tile 级并行的 Python 编程范式,不仅提供了类 PyTorch 的高层表达能力,更在底层构建了一套精细的设备放置(Device Placement)与跨设备调度(Cross-Device Scheduling)抽象机制

一、设备抽象模型:统一异构资源视图

PyPTO 的一切调度始于对硬件资源的统一建模。它不直接暴露物理设备 ID,而是通过 DeviceSpecDeviceMesh 两个核心抽象,构建逻辑设备视图。

1.1 DeviceSpec:设备能力描述

每个物理或逻辑设备由 DeviceSpec 描述,包含:

  • 类型("npu", "cpu"
  • 内存容量与带宽
  • 计算单元数量
  • 通信能力(是否支持 P2P、带宽)
# pypto/core/device.py
@dataclass
class DeviceSpec:
    type: str          # "npu", "cpu"
    id: int            # 逻辑 ID
    memory_gb: float
    compute_units: int
    peer_access: List[int]  # 可直连的设备 ID 列表

该信息由 CANN runtime 在初始化时探测并注册。

1.2 DeviceMesh:逻辑设备网格

DeviceMesh 将多个 DeviceSpec 组织为 N 维网格,用于表达并行策略(如数据并行、张量并行):

# 创建 2x2 NPU 网格
mesh = DeviceMesh(
    shape=(2, 2),
    devices=[0, 1, 2, 3],  # 逻辑设备 ID
    axis_names=("dp", "tp")  # 数据并行轴、张量并行轴
)

DeviceMesh 是后续放置策略的拓扑基础,支持 reshape、transpose 等操作以匹配不同并行模式。


二、设备放置策略:从声明到绑定

PyPTO 支持两种设备放置方式:显式声明自动推导

2.1 显式放置:with device_context()

开发者可通过上下文管理器显式指定设备:

from pypto import device_context, npu

with device_context(npu(0)):
    x = pt.randn(1024, 1024)
    y = pt.matmul(x, x.T)

该语法糖背后调用 PlacementPolicy.bind(tensor, device),将张量 x 和后续算子绑定到 npu(0)

2.2 自动放置:基于计算图的推导

对于复杂模型,PyPTO 提供 AutoPlacementPolicy,根据以下规则自动分配设备:

  1. 输入张量继承来源设备
  2. 算子优先放置在输入张量所在设备
  3. 若输入来自多个设备,则选择主设备(majority vote)或插入通信
  4. 内存压力过大时触发迁移(migration)

关键实现在 pypto/scheduler/placement.py

# pypto/scheduler/placement.py
class AutoPlacementPolicy:
    def place_op(self, op: OpNode, graph: ComputeGraph) -> DeviceSpec:
        input_devices = [t.device for t in op.inputs]
        if len(set(input_devices)) == 1:
            return input_devices[0]  # 同设备,直接放置
        else:
            # 多设备输入:选择计算能力最强的设备
            candidate = max(input_devices, key=lambda d: d.compute_units)
            # 插入隐式通信节点(如 AllGather)
            self._insert_comm_nodes(op, candidate)
            return candidate

该策略确保最小化跨设备数据移动,是低延迟调度的前提。


三、跨设备调度抽象:调度器与事件图

放置完成后,PyPTO 的 CrossDeviceScheduler 负责生成可执行的调度计划。

3.1 调度单元:Task 与 Event

PyPTO 将每个算子执行、内存拷贝、通信操作封装为 Task,并构建 EventGraph 表达依赖关系:

# pypto/scheduler/task.py
@dataclass
class Task:
    id: str
    op: OpNode
    device: DeviceSpec
    deps: List[str]  # 依赖的 Task ID
    type: Literal["compute", "memcpy", "comm"]

例如,一个跨设备 matmul 可能生成:

  • Task A: matmul on NPU 0
  • Task B: memcpy from NPU 0 → NPU 1
  • Task C: add on NPU 1 (依赖 B)

3.2 调度器:拓扑感知的任务分发

CrossDeviceScheduler 根据 DeviceMesh 的通信拓扑优化任务顺序:

# pypto/scheduler/scheduler.py
class CrossDeviceScheduler:
    def schedule(self, event_graph: EventGraph) -> ExecutionPlan:
        # 1. 拓扑排序
        ordered_tasks = topological_sort(event_graph)
        
        # 2. 通信路径优化:使用最短路径(如 NVLink > PCIe)
        for task in ordered_tasks:
            if task.type == "comm":
                src, dst = task.src_dev, task.dst_dev
                path = self.topology.shortest_path(src.id, dst.id)
                task.comm_path = path  # 记录路由
        
        # 3. 生成设备本地任务队列
        per_device_queues = defaultdict(list)
        for task in ordered_tasks:
            per_device_queues[task.device.id].append(task)
        
        return ExecutionPlan(per_device_queues)

该设计使得通信开销可预测、可优化


四、执行引擎:多流并行与事件同步

最终,ExecutionPlan 被提交给 MultiStreamExecutor,在每个设备上启动独立执行流。

4.1 设备本地流(Stream)

每个设备维护多个 Stream(类似 CUDA Stream),用于重叠计算与通信:

// pypto/runtime/stream_manager.cc (C++ backend)
class StreamManager {
public:
    Stream* get_compute_stream(int dev_id);
    Stream* get_comm_stream(int dev_id);
    
    void submit(Task& task, Stream* stream);
};

PyPTO 默认为每个设备分配:

  • 1 个计算流
  • 1 个通信流
  • 1 个内存拷贝流

4.2 跨设备事件同步

当 Task C 依赖 Task B(跨设备),PyPTO 使用 事件信号(Event Signal) 实现同步:

# 伪代码:跨设备同步
event = runtime.record_event(comm_stream_npu0)
runtime.wait_event(compute_stream_npu1, event)

底层通过 CANN 驱动提供的 跨设备事件接口 实现,避免 CPU 轮询,降低延迟。


五、高级特性:动态放置与弹性调度

PyPTO 还支持运行时调整放置策略,以应对负载不均或故障。

5.1 动态迁移(Migration)

通过 pt.move(tensor, target_device) 可触发张量迁移:

# 将张量迁移到 NPU 1
x_moved = pt.move(x, npu(1))
# 后续计算自动在 NPU 1 执行
y = pt.relu(x_moved)

迁移过程由调度器自动插入 memcpy Task,并更新张量的设备绑定。

5.2 弹性容错

若某设备故障,PyPTO 可从检查点恢复,并重新分配任务到健康设备:

# 注册容错回调
pypto.register_fault_handler(lambda dev: redistribute_tasks(dev))

该机制依赖 CANN runtime 的设备健康监控接口。


六、与 CANN 生态的集成

PyPTO 的调度抽象并非孤立存在,而是深度集成于 CANN 全栈:

  • 与 GE 图引擎协同:PyPTO 计算图可被序列化为 GE IR,复用其图优化能力;
  • 与 HCCL 通信库对接:跨设备通信 Task 直接调用 hccl::AllReduce 等原语;
  • 与 Runtime 内存管理对齐:张量内存由 runtime.allocate() 分配,确保设备一致性。

这种集成使得 PyPTO 既能作为独立编程框架使用,又能无缝嵌入现有 CANN 应用。


结语

CANN PyPTO 通过 DeviceSpec/DeviceMesh 抽象 + 自动放置策略 + 跨设备事件图调度 + 多流执行引擎 的四层架构,构建了一套灵活而高效的设备管理与调度系统。它既满足了科研人员对编程简洁性的需求,又为工业级部署提供了底层控制能力。在 2026 年的演进中,PyPTO 正进一步探索 编译时静态调度运行时动态调整 的融合,朝着“一次编写,随处高效运行”的目标稳步迈进。


cann组织链接:https://atomgit.com/cann
pypto仓库链接:https://atomgit.com/cann/pypto

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐