Part 4: Kubelet Volume Manager、Container Manager 与 Plugin System 超深度分析


目录


一、模块定位

1.1 Volume Manager — 存储生命周期编排器

业务职责:Volume Manager 是 Kubelet 中负责节点级别卷(Volume)生命周期管理的核心子系统。它监控调度到本节点的 Pod,确定哪些卷需要被挂载(Mount)/卸载(Unmount)、附加(Attach)/分离(Detach),并通过异步循环驱动实际操作执行。

源码位置pkg/kubelet/volumemanager/

核心设计理念

  • ASW/DSW 双缓存模型:Desired State of World(期望状态)+ Actual State of World(实际状态),通过 Reconciler 持续比对并驱动收敛
  • 声明式驱动:Pod 声明需要什么卷 → Populator 更新 DSW → Reconciler 发现差异 → OperationExecutor 执行操作 → ASW 更新
  • CSI 迁移兼容:通过 csiMigratedPluginManager 和 intreeToCSITranslator 支持 in-tree 插件向 CSI 平滑迁移

1.2 Container Manager — 资源分配与隔离指挥中心

业务职责:Container Manager(CM)是 Kubelet 中管理容器资源分配与隔离的核心子系统,涵盖 cgroup 管理、CPU 亲和性、内存亲和性、设备分配、NUMA 拓扑协调等。它确保 Pod 的资源请求在节点上得到正确实施,包括 QoS 分级、Node Allocatable 约束、以及硬件拓扑感知的资源分配。

源码位置pkg/kubelet/cm/

核心设计理念

  • 层级化管理:Container Manager → 子 Manager(CPU/Memory/Device/Topology)→ Policy → State
  • Topology-First 策略:Topology Manager 协调所有 Hint Provider(CPU/Memory/Device),在 Pod 准入阶段统一计算 NUMA 亲和性
  • Checkpoint 持久化:各子 Manager 的分配状态通过 Checkpoint Manager 持久化到磁盘,支持 Kubelet 重启后恢复

1.3 Plugin Manager — 插件注册发现协调器

业务职责:Plugin Manager 负责监听节点上插件 Socket 文件的创建与删除,协调插件的注册(Register)与反注册(Deregister)流程。它是 Device Plugin、CSI Plugin 等第三方扩展与 Kubelet 交互的桥梁。

源码位置pkg/kubelet/pluginmanager/

核心设计理念

  • 文件系统驱动:通过 fsnotify 监听 Socket 目录,将文件创建/删除事件转化为插件注册/反注册意图
  • ASW/DSW 模型:与 Volume Manager 采用相同的 Desired/Actual State 模式
  • Handler 抽象:通过 PluginHandler 接口将具体的注册逻辑委托给消费者(如 Device Manager)

二、Volume Manager 深度解析

2.1 模块整体结构

2.1.1 核心类型与接口
// VolumeManager 顶层接口
type VolumeManager interface {
    Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
    WaitForAttachAndMount(pod *v1.Pod) error
    GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
    GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
    GetVolumesInUse() []v1.UniqueVolumeName
    ReconcilerStatesHasBeenSynced() bool
    VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
    MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}
// volumeManager 具体实现
type volumeManager struct {
    kubeClient          clientset.Interface
    volumePluginMgr     *volume.VolumePluginMgr
    desiredStateOfWorld cache.DesiredStateOfWorld     // 期望状态缓存
    actualStateOfWorld  cache.ActualStateOfWorld       // 实际状态缓存
    operationExecutor   operationexecutor.OperationExecutor  // 操作执行器
    reconciler          reconciler.Reconciler          // 协调器
    desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator  // 期望状态填充器
    csiMigratedPluginManager csimigration.PluginManager
    intreeToCSITranslator     csimigration.InTreeToCSITranslator
}
2.1.2 关键常量
常量 含义
reconcilerLoopSleepPeriod 100ms Reconciler 循环间隔
desiredStateOfWorldPopulatorLoopSleepPeriod 100ms Populator 循环间隔
desiredStateOfWorldPopulatorGetPodStatusRetryDuration 2s Pod 状态重试间隔
podAttachAndMountTimeout 2m3s Pod 卷挂载超时
podAttachAndMountRetryInterval 300ms Pod 卷挂载重试间隔
waitForAttachTimeout 10m 等待 Attach 超时
2.1.3 依赖注入关系

Volume Manager 的创建通过 NewVolumeManager() 工厂函数完成,注入以下依赖:

  • kubeClient:API Server 通信
  • volumePluginMgr:卷插件管理器(必须预初始化)
  • mounter:文件系统挂载接口
  • hostutil:主机工具接口
  • blockVolumePathHandler:块卷路径处理

内部组件的依赖关系为:

volumeManager
├── desiredStateOfWorld (依赖 volumePluginMgr)
├── actualStateOfWorld (依赖 nodeName, volumePluginMgr)
├── operationExecutor
│   └── operationGenerator (依赖 kubeClient, volumePluginMgr, recorder, blockVolumePathHandler)
├── reconciler (依赖所有上述组件 + mounter, hostutil)
└── desiredStateOfWorldPopulator (依赖 kubeClient, podManager, podStatusProvider, DSW, ASW, containerRuntime, csiMigratedPluginManager)

2.2 Desired State of World (DSW) 深度解析

DSW 是 Volume Manager 的"意图存储",记录"哪些 Pod 需要哪些卷"。

2.2.1 接口定义
type DesiredStateOfWorld interface {
    AddPodToVolume(podName volumetypes.UniquePodName, pod *v1.Pod, 
        volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGidValue string) (v1.UniqueVolumeName, error)
    DeletePodFromVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName)
    MarkVolumesReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
    AddErrorToPod(podName volumetypes.UniquePodName, err string)
    PopPodErrors(podName volumetypes.UniquePodName) []string
    GetVolumesToMount() []VolumeToMount
    PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, error)
    VolumeExistsWithSpecName(podName volumetypes.UniquePodName, volumeSpecName string) bool
    VolumeExists(volumeName v1.UniqueVolumeName) bool
    MarkVolumeAttachability(volumeName v1.UniqueVolumeName, attachable bool)
    GetPodsWithErrors() []volumetypes.UniquePodName
}
2.2.2 核心数据结构

DSW 内部维护两个关键映射:

  • volumesToMount: map[v1.UniqueVolumeName]volumeToMount — 卷名到卷挂载信息的映射
  • podsToMarkUnmounted: 已标记为卸载的 Pod 集合

每个 volumeToMount 条目包含:

  • volumeName: 唯一卷名
  • pod: Pod 对象引用
  • volumeSpec: 卷规格
  • pluginIsAttachable: 插件是否支持 Attach
  • reportedInUse: 是否已报告为使用中
  • desiredSizeLimit: 期望的卷大小限制
2.2.3 AddPodToVolume 流程
  1. 根据 volumeSpecvolumePluginMgr 查找对应插件
  2. 判断插件是否支持 Attach 操作
  3. 根据 PV/PVC 信息生成 UniqueVolumeName
  4. 如果卷已存在于 DSW 中,仅添加 Pod 到该卷的 podsToMount 映射
  5. 如果卷不存在,创建新的 volumeToMount 条目
  6. 所有操作在写锁保护下进行

2.3 Actual State of World (ASW) 深度解析

ASW 是 Volume Manager 的"事实存储",记录"哪些卷实际上已经附加/挂载到本节点"。

2.3.1 接口定义
type ActualStateOfWorld interface {
    MarkVolumeAsAttached(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, 
        nodeName types.NodeName, devicePath string) error
    MarkVolumeAsMounted(markVolumeOpts MarkVolumeOpts) error
    MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
    MarkVolumeAsDetached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
    MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath string, deviceMountPath string) error
    MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
    MarkRemountRequired(podName volumetypes.UniquePodName)
    MarkFSResizeRequired(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName)
    GetMountedVolumesForPod(podName volumetypes.UniquePodName) []MountedVolume
    GetAllMountedVolumes() []MountedVolume
    GetUnmountedVolumes() []AttachedVolume
    GetAttachedVolumes() []AttachedVolume
    GetMountedVolumes() []MountedVolume
    GetVolumesInUse() []v1.UniqueVolumeName
    VolumeExists(volumeName v1.UniqueVolumeName) bool
    VolumeExistsWithSpecName(podName volumetypes.UniquePodName, volumeSpecName string) bool
    PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, error)
}
2.3.2 核心数据结构

ASW 内部维护三个关键映射:

  • attachedVolumes: map[v1.UniqueVolumeName]attachedVolume — 已附加的卷
  • mountedVolumes: 从 attachedVolume 中派生,记录卷到 Pod 的挂载关系
  • deviceMountPaths: map[v1.UniqueVolumeName]deviceMountPath — 设备挂载路径

每个 attachedVolume 包含:

  • volumeName: 唯一卷名
  • volumeSpec: 卷规格
  • nodeName: 节点名
  • devicePath: 设备路径
  • deviceMountPath: 设备挂载路径
  • pluginIsAttachable: 是否支持 Attach
  • deviceMayBeMounted: 设备可能已挂载标记
  • mountedPods: map[volumetypes.UniquePodName]mountedPod — 已挂载到此卷的 Pod

2.4 DesiredStateOfWorldPopulator 深度解析

Populator 是 DSW 的数据来源,它定期从 PodManager 获取 Pod 列表,将卷信息注入 DSW。

2.4.1 核心数据结构
type desiredStateOfWorldPopulator struct {
    kubeClient                clientset.Interface
    loopSleepDuration         time.Duration
    getPodStatusRetryDuration time.Duration
    podManager                pod.Manager
    podStatusProvider         status.PodStatusProvider
    desiredStateOfWorld       cache.DesiredStateOfWorld
    actualStateOfWorld        cache.ActualStateOfWorld
    pods                      processedPods
    kubeContainerRuntime      kubecontainer.Runtime
    timeOfLastGetPodStatus    time.Time
    keepTerminatedPodVolumes  bool
    hasAddedPods              bool
    hasAddedPodsLock          sync.RWMutex
    csiMigratedPluginManager  csimigration.PluginManager
    intreeToCSITranslator     csimigration.InTreeToCSITranslator
    volumePluginMgr           *volume.VolumePluginMgr
}

type processedPods struct {
    processedPods map[volumetypes.UniquePodName]bool
    sync.RWMutex
}
2.4.2 运行流程

Populator 的 Run() 方法启动两个阶段:

阶段一:等待所有 Source 就绪

wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
    done := sourcesReady.AllReady()
    dswp.populatorLoop()
    return done, nil
}, stopCh)
dswp.hasAddedPods = true

阶段二:持续循环

wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)

每个 populatorLoop() 执行两个操作:

  1. findAndAddNewPods():遍历 PodManager 中的所有 Pod,将新 Pod 的卷添加到 DSW
  2. findAndRemoveDeletedPods():检查 DSW 中的 Pod 是否仍然存在,删除已终止 Pod 的卷
2.4.3 processPodVolumes 逐行解析
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
    pod *v1.Pod,
    mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
    processedVolumesForFSResize sets.String) {
    // 1. nil 检查
    if pod == nil { return }
    
    // 2. 去重:如果 Pod 已处理过,跳过
    uniquePodName := util.GetUniquePodName(pod)
    if dswp.podPreviouslyProcessed(uniquePodName) { return }
    
    // 3. 获取 Pod 的所有卷名(mounts 和 devices 分别处理)
    allVolumesAdded := true
    mounts, devices := util.GetPodVolumeNames(pod)
    
    // 4. 遍历 Pod Spec 中的每个 Volume
    for _, podVolume := range pod.Spec.Volumes {
        // 4a. 跳过 Pod 未使用的卷
        if !mounts.Has(podVolume.Name) && !devices.Has(podVolume.Name) { continue }
        
        // 4b. 创建 VolumeSpec(处理 PVC 解引用、CSI 迁移等)
        pvc, volumeSpec, volumeGidValue, err := dswp.createVolumeSpec(podVolume, pod, mounts, devices)
        if err != nil {
            dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
            allVolumesAdded = false
            continue
        }
        
        // 4c. 添加到 DSW
        _, err = dswp.desiredStateOfWorld.AddPodToVolume(
            uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
        
        // 4d. 如果启用了在线扩容,检查文件系统是否需要 Resize
        if expandInUsePV {
            dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, 
                uniquePodName, mountedVolumesForPod, processedVolumesForFSResize)
        }
    }
    
    // 5. 标记 Pod 已处理
    if allVolumesAdded {
        dswp.markPodProcessed(uniquePodName)
        dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
        dswp.desiredStateOfWorld.PopPodErrors(uniquePodName)
    }
}
2.4.4 createVolumeSpec 深度解析

这是 Populator 最复杂的方法之一,负责将 Pod Spec 中的 Volume 声明解析为具体的 VolumeSpec:

  1. PVC 卷:调用 getPVCExtractPV() 从 API Server 获取 PVC → 解析出 PV → 调用 getPVSpec() 获取 PV Spec
    • 验证 PVC 是否在删除中(StorageObjectInUseProtection feature gate)
    • 验证 PVC 是否已 Bound
    • 检查卷模式(Block vs Filesystem)与 Pod 中的使用方式是否匹配
  2. Ephemeral 内联卷:如果 feature gate 启用,将其视为 PVC 处理
  3. In-tree 卷:直接创建 VolumeSpec
  4. CSI 迁移:对于所有可迁移的卷,调用 TranslateInTreeSpecToCSI() 将 in-tree Spec 翻译为 CSI Spec

2.5 Reconciler 深度解析

Reconciler 是 Volume Manager 的执行引擎,它持续比对 DSW 和 ASW,驱动操作执行。

2.5.1 核心数据结构
type reconciler struct {
    kubeClient                    clientset.Interface
    controllerAttachDetachEnabled bool       // AD Controller 是否接管 Attach/Detach
    loopSleepDuration             time.Duration
    waitForAttachTimeout          time.Duration
    nodeName                      types.NodeName
    desiredStateOfWorld           cache.DesiredStateOfWorld
    actualStateOfWorld            cache.ActualStateOfWorld
    populatorHasAddedPods         func() bool
    operationExecutor             operationexecutor.OperationExecutor
    mounter                       mount.Interface
    hostutil                      hostutil.HostUtils
    volumePluginMgr               *volumepkg.VolumePluginMgr
    kubeletPodsDir                string
    timeOfLastSync                time.Time   // 最后一次 sync 时间
}
2.5.2 reconciliationLoopFunc
func (rc *reconciler) reconciliationLoopFunc() func() {
    return func() {
        rc.reconcile()  // 主协调逻辑
        
        // 当 Populator 已完成 Pod 添加且尚未同步过状态时,执行一次性 sync
        if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
            rc.sync()  // 从磁盘扫描重建卷状态
        }
    }
}
2.5.3 reconcile() 三阶段流程

阶段一:卸载不再需要的卷(unmountVolumes)

func (rc *reconciler) unmountVolumes() {
    for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
        // 检查 Pod 是否仍在 DSW 中引用此卷
        if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
            // 卷已挂载但不再需要 → 触发 UnmountVolume
            rc.operationExecutor.UnmountVolume(mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
        }
    }
}

关键点:卸载在挂载之前执行,确保一个卷从旧 Pod 卸载后才挂载到新 Pod。

阶段二:挂载/附加需要的卷(mountAttachVolumes)

func (rc *reconciler) mountAttachVolumes() {
    for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
        
        if cache.IsVolumeNotAttachedError(err) {
            // 卷未附加 → 分两种情况
            if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
                // 情况A: AD Controller 负责,或插件不支持 Attach → 验证 Controller 是否已附加
                rc.operationExecutor.VerifyControllerAttachedVolume(...)
            } else {
                // 情况B: Kubelet 自己负责 Attach → 执行 AttachVolume
                rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
            }
        } else if !volMounted || cache.IsRemountRequiredError(err) {
            // 卷已附加但未挂载,或需要重新挂载 → 执行 MountVolume
            rc.operationExecutor.MountVolume(...)
        } else if cache.IsFSResizeRequiredError(err) {
            // 文件系统需要在线扩容 → 执行 ExpandInUseVolume
            rc.operationExecutor.ExpandInUseVolume(...)
        }
    }
}

阶段三:分离/卸载设备(unmountDetachDevices)

func (rc *reconciler) unmountDetachDevices() {
    for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
        // 卷已附加但无 Pod 挂载,且不在 DSW 中
        if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
           !rc.operationExecutor.IsOperationPending(...) {
            
            if attachedVolume.DeviceMayBeMounted() {
                // 设备全局挂载中 → 先 UnmountDevice
                rc.operationExecutor.UnmountDevice(...)
            } else {
                // 设备未全局挂载 → Detach 或标记已分离
                if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
                    rc.actualStateOfWorld.MarkVolumeAsDetached(...)
                } else {
                    rc.operationExecutor.DetachVolume(...)
                }
            }
        }
    }
}
2.5.4 syncStates — Kubelet 重启后卷状态重建

syncStates() 是 Reconciler 中最复杂的部分,在 Kubelet 重启后执行一次,从磁盘扫描 Pod 目录重建卷状态:

  1. 扫描 /pods/{podUID}/volume/{pluginName}/{volumeName}/pods/{podUID}/volumeDevices/{pluginName}/{volumeName}
  2. 对每个发现的卷:
    • 如果 ASW 中已存在 → 跳过
    • 尝试 reconstructVolume():通过 plugin 重建 VolumeSpec、Mounter/BlockVolumeMapper
    • 如果重建成功且卷在 DSW 中 → 标记为 InUse(避免误卸载)
    • 如果重建成功但不在 DSW 中 → 添加到 ASW(孤儿卷恢复)
    • 如果重建失败且不在 DSW 中 → cleanupMounts() 清理
  3. 调用 updateDevicePath() 从 Node Status 的 VolumesAttached 获取设备路径

2.6 OperationExecutor

OperationExecutor 是卷操作的异步执行器,通过 goroutinemapnestedpendingoperations 确保同一卷不会同时执行多个操作。关键操作:

  • AttachVolume:调用 Volume Plugin 的 Attacher.Attach()
  • DetachVolume:调用 Detacher.Detach()
  • MountVolume:调用 Mounter.SetUp() 或 DeviceMounter.MountDevice()
  • UnmountVolume:调用 Mounter.TearDown()
  • UnmountDevice:调用 DeviceMounter.UnmountDevice()
  • VerifyControllerAttachedVolume:轮询 Node Status 等待 AD Controller 完成附加
  • ExpandInUseVolume:在线扩容
  • ReconstructVolumeOperation:从磁盘重建卷

三、Container Manager 深度解析

3.1 模块整体结构

3.1.1 核心接口
// ContainerManager 顶层接口
type ContainerManager interface {
    Start(node *v1.Node, activePods ActivePodsFunc, sourcesReady config.SourcesReady, 
        podStatusProvider status.PodStatusProvider, runtimeService internalapi.RuntimeService) error
    NewPodContainerManager() PodContainerManager
    GetPodCgroupRoot() string
    GetMountedSubsystems() *CgroupSubsystems
    GetQOSContainersInfo() QOSContainersInfo
    UpdateQOSCgroups() error
    Status() Status
    GetNodeConfig() NodeConfig
    GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error)
    GetPluginRegistrationHandler() cache.PluginHandler
    UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
    GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler
    InternalContainerLifecycle() InternalContainerLifecycle
    SystemCgroupsLimit() v1.ResourceList
}
3.1.2 containerManagerImpl 结构
type containerManagerImpl struct {
    sync.RWMutex
    cadvisorInterface   cadvisor.Interface
    mountUtil           mount.Interface
    NodeConfig
    status              Status
    systemContainers    []*systemContainer
    periodicTasks       []func()
    subsystems          *CgroupSubsystems
    nodeInfo            *v1.Node
    cgroupManager       CgroupManager
    capacity            v1.ResourceList
    internalCapacity    v1.ResourceList
    cgroupRoot          CgroupName
    recorder            record.EventRecorder
    qosContainerManager QOSContainerManager
    deviceManager       devicemanager.Manager
    cpuManager          cpumanager.Manager
    memoryManager       memorymanager.Manager
    topologyManager     topologymanager.Manager
}
3.1.3 NodeConfig 关键配置
type NodeConfig struct {
    CgroupRoot                         string
    CgroupsPerQOS                      bool
    CgroupDriver                       string
    SystemCgroupsName                  string
    KubeletCgroupsName                 string
    ContainerRuntime                   string
    ExperimentalCPUManagerPolicy       string
    ExperimentalCPUManagerReconcilePeriod time.Duration
    ExperimentalMemoryManagerPolicy    string
    ExperimentalMemoryManagerReservedMemory []kubeletconfig.MemoryReservation
    ExperimentalTopologyManagerPolicy  string
    ExperimentalTopologyManagerScope   string
    ReservedSystemCPUs                 string
    NodeAllocatableConfig
    KubeletRootDir                     string
}

3.2 初始化流程深度解析

NewContainerManager() 执行以下初始化步骤:

1. validateSystemRequirements() → 检查 cgroup 子系统是否挂载(cpu, cpuacct, cpuset, memory)
2. failSwapOn 检查 → 读取 /proc/swaps 验证 swap 未开启
3. 获取 MachineInfo → 计算节点 Capacity
4. 解析 CgroupRoot → 确保根 cgroup 存在
5. 初始化 QOSContainerManager
6. 初始化 TopologyManager(如果 feature gate 启用)
7. 初始化 DeviceManager → 注册为 HintProvider
8. 初始化 CPUManager → 注册为 HintProvider  
9. 初始化 MemoryManager → 注册为 HintProvider
10. 返回 containerManagerImpl 实例

Start() 方法执行:

1. 启动 CPUManager(从 runtime 获取初始容器映射)
2. 启动 MemoryManager
3. 缓存 NodeInfo
4. 验证 Node Allocatable 配置
5. setupNode() → 设置 QoS cgroup、系统容器、内核参数
6. 启动 DeviceManager

3.3 CgroupManager 深度解析

3.3.1 接口定义
type CgroupManager interface {
    Exists(name CgroupName) bool
    Create(name CgroupName, resource *ResourceConfig) error
    Destroy(name CgroupName) error
    Update(name CgroupName, resource *ResourceConfig) error
    Name(name CgroupName) string
    Pids(name CgroupName) []int
    ReduceCPULimits(cgroupName CgroupName) error
    GetResourceStats(name CgroupName) (*ResourceStats, error)
}
3.3.2 CgroupName 与路径映射

CgroupName 是一个字符串切片,表示 cgroup 层级路径。两种驱动方式有不同的路径映射:

cgroupfs 驱动

CgroupName{"kubepods", "burstable", "pod1234"} → /kubepods/burstable/pod1234/

systemd 驱动

CgroupName{"kubepods", "burstable", "pod1234"} → /kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod1234.slice/

systemd 驱动的转换规则:

  • 每个组件添加 .slice 后缀
  • 层级间用 - 连接
  • 组件名中的 - 转义为 _escapeSystemdCgroupName
  • 使用 cgroupsystemd.ExpandSlice() 生成完整路径
3.3.3 cgroup 层次结构

CgroupsPerQOS=true 时,cgroup 层次如下:

/kubepods.slice/                          (cgroupRoot)
├── /kubepods-pod1234.slice/              (Pod 级 cgroup, QoS=Guaranteed)
├── /kubepods-burstable.slice/            (Burstable QoS 级)
│   ├── /kubepods-burstable-pod5678.slice/ (Pod 级 cgroup)
│   └── ...
├── /kubepods-besteffort.slice/           (BestEffort QoS 级)
│   ├── /kubepods-besteffort-pod9012.slice/
│   └── ...
└── /kubepods-pod1234.slice/              (Guaranteed Pod 直接在 kubepods 下)

3.4 CPU Manager 深度解析

3.4.1 接口定义
type Manager interface {
    Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, 
        podStatusProvider status.PodStatusProvider, runtimeService internalapi.RuntimeService, 
        initialContainers containermap.ContainerMap) error
    Allocate(pod *v1.Pod, container *v1.Container) error
    RemoveContainer(containerID string) error
    State() state.Reader
    GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
    GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint
    GetHintPorviderName() string
    GetAllocatableCPUs() cpuset.CPUSet
}
3.4.2 Static Policy 核心逻辑

Static Policy 是 CPU Manager 的核心策略,仅对 Guaranteed QoS 的 Pod 进行静态 CPU 分配:

准入条件

  • Pod QoS 为 Guaranteed(CPU Request == CPU Limit,且均为整数值)
  • 请求的 CPU 为整数(如 1, 2, 4

分配流程

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
    // 1. 非 Guaranteed Pod → 跳过
    if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { return nil }
    
    // 2. 幂等检查:如果容器已有分配 → 跳过
    if s.GetCPUAssignments(string(pod.UID), container.Name) != nil { return nil }
    
    // 3. 计算需要的 CPU 数量
    numCPUs := calculateCPUs(pod, container)
    if numCPUs == 0 { return nil }
    
    // 4. 从 TopologyManager 获取 NUMA 亲和性
    hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
    
    // 5. 基于 hint 计算可用的 CPU 集合
    //    如果 hint 有 NUMA 亲和性 → 在指定 NUMA 节点上分配
    //    否则 → 从默认 cpuset 分配
    
    // 6. 执行分配,更新 state
    //    - 从 freeCPUs 中移除已分配的 CPU
    //    - 更新 defaultCPUs(非 Guaranteed Pod 可用的 CPU)
    //    - 记录分配关系到 checkpoint
}

Reconcile 机制

CPU Manager 启动一个独立的 reconcile 循环,定期:

  1. 获取所有活跃 Pod
  2. 检查每个 Pod 的容器是否仍在运行
  3. 清理已终止容器的 CPU 分配(removeStaleState
  4. 将当前分配写入 checkpoint
3.4.3 Checkpoint 持久化

CPU Manager 使用 cpu_manager_state 文件持久化分配状态:

type CPUManagerCheckpoint struct {
    PolicyName    string                               `json:"policyName"`
    DefaultCPUSet string                               `json:"defaultCPUSet"`
    Entries       map[string]map[string]string          `json:"entries"`  // podUID → containerName → cpuset
    Checksum      checksum.Checksum                     `json:"checksum"`
}

启动时通过 state.NewCheckpointState() 从文件恢复,如果文件不存在或校验失败,则从初始状态重新开始。

3.5 Memory Manager 深度解析

3.5.1 接口定义
type Manager interface {
    Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, 
        podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, 
        initialContainers containermap.ContainerMap) error
    AddContainer(p *v1.Pod, c *v1.Container, containerID string)
    Allocate(pod *v1.Pod, container *v1.Container) error
    RemoveContainer(containerID string) error
    State() state.Reader
    GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
    GetPodTopologyHints(*v1.Pod) map[string][]topologymanager.TopologyHint
    GetHintPorviderName() string
    GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Int
}
3.5.2 Static Policy 分配逻辑

Memory Manager 的 Static Policy 为 Guaranteed Pod 在 NUMA 节点级别静态分配内存:

准入条件

  • Pod QoS 为 Guaranteed
  • Memory Request == Memory Limit

分配流程policy_static.go 逐行解析):

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
    // 1. 仅 Guaranteed Pod
    if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { return nil }
    
    // 2. 幂等检查
    if blocks := s.GetMemoryBlocks(string(pod.UID), container.Name); blocks != nil { return nil }
    
    // 3. 获取 NUMA 亲和性
    hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
    
    // 4. 如果 hint 无 NUMA 亲和性 → 计算默认 hint
    if hint.NUMANodeAffinity == nil {
        defaultHint, err := p.getDefaultHint(s, requestedResources)
        bestHint = defaultHint
    }
    
    // 5. 如果 hint 不满足请求 → 扩展 hint
    if !isAffinitySatisfyRequest(machineState, bestHint.NUMANodeAffinity, requestedResources) {
        extendedHint, err := p.extendTopologyManagerHint(s, requestedResources, bestHint.NUMANodeAffinity)
        bestHint = extendedHint
    }
    
    // 6. 在 NUMA 节点上分配内存块
    for resourceName, requestedSize := range requestedResources {
        containerBlocks = append(containerBlocks, state.Block{
            NUMAAffinity: maskBits,
            Size:         requestedSize,
            Type:         resourceName,  // ResourceMemory 或 Hugepages-*
        })
        
        // 7. 更新机器内存状态
        for _, nodeID := range maskBits {
            // 从 Free 转移到 Reserved
            if nodeResourceMemoryState.Free >= requestedSize {
                nodeResourceMemoryState.Reserved += requestedSize
                nodeResourceMemoryState.Free -= requestedSize
                requestedSize = 0
            } else {
                requestedSize -= nodeResourceMemoryState.Free
                nodeResourceMemoryState.Reserved += nodeResourceMemoryState.Free
                nodeResourceMemoryState.Free = 0
            }
        }
    }
}
3.5.3 Memory Block 数据结构
type Block struct {
    NUMAAffinity []int              // NUMA 节点 ID 列表
    Size         uint64             // 分配大小
    Type         v1.ResourceName    // 资源类型(memory 或 hugepages-*)
}
3.5.4 Machine State

Memory Manager 维护每个 NUMA 节点的内存状态:

type NUMANodeState struct {
    NumberOfAssignments int                                // 分配计数
    Cells              []int                               // NUMA 亲和性组
    MemoryMap          map[v1.ResourceName]*MemoryState    // 每种内存类型的状态
}

type MemoryState struct {
    Free     uint64     // 可用内存
    Reserved uint64     // 已保留内存
}
3.5.5 removeStaleState

与 CPU Manager 类似,Memory Manager 在每次 Allocate 和 GetTopologyHints 调用前都会执行 removeStaleState()

  1. 检查 sourcesReady 是否就绪
  2. 获取活跃 Pod 列表
  3. 构建活跃容器集合
  4. 遍历 MemoryAssignments,移除不在活跃集合中的容器
  5. 释放对应的内存块

3.6 Device Manager 深度解析

3.6.1 接口定义
type Manager interface {
    Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
    GetWatcherHandler() cache.PluginHandler
    Allocate(pod *v1.Pod, container *v1.Container) error
    UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
    GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error)
    GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
    GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint
    GetHintPorviderName() string
    GetAllocatableDevices() []*Device
}
3.6.2 Device Plugin 交互模型

Device Manager 通过 gRPC 与 Device Plugin 交互,遵循 Kubernetes Device Plugin API(v1beta1):

注册流程

  1. Device Plugin 在 /var/lib/kubelet/plugins_registry/ 创建 Socket
  2. Plugin Watcher 检测到 Socket → 触发注册流程
  3. Device Manager 通过 gRPC 连接到 Plugin
  4. 调用 GetDevicePluginOptions() 获取选项
  5. 启动 ListAndWatch 流获取设备列表
  6. 设备信息缓存到 endpoint

Endpoint 数据结构

type endpointImpl struct {
    client     pluginapi.DevicePluginClient
    clientConn *grpc.ClientConn
    socketPath   string
    resourceName string
    stopTime     time.Time
    mutex sync.Mutex
    cb    monitorCallback   // 设备状态变化回调
}

Allocate 流程

  1. 从 State 中获取容器的设备分配
  2. 对每个资源类型,调用对应 endpoint 的 allocate() 方法
  3. Device Plugin 返回 ContainerResponse(包含设备路径、挂载点、环境变量等)
  4. 组装为 RunContainerOptions
3.6.3 设备分配状态
type Device struct {
    ID       string                // 设备 ID
    Health   string                // 健康状态
    Topology *pluginapi.TopologyInfo  // NUMA 拓扑信息
}

type DeviceInstanceInfo struct {
    deviceIds sets.String           // 已分配的设备 ID
    allocFunc func(resource string, devs []string) (*pluginapi.AllocateResponse, error)
}

3.7 Topology Manager 深度解析

3.7.1 核心接口
type Manager interface {
    lifecycle.PodAdmitHandler
    AddHintProvider(HintProvider)
    AddContainer(pod *v1.Pod, container *v1.Container, containerID string)
    RemoveContainer(containerID string) error
    Store
}

type HintProvider interface {
    GetHintPorviderName() string
    GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]TopologyHint
    GetPodTopologyHints(pod *v1.Pod) map[string][]TopologyHint
    Allocate(pod *v1.Pod, container *v1.Container) error
}

type TopologyHint struct {
    NUMANodeAffinity bitmask.BitMask
    Preferred        bool
}
3.7.2 策略体系

Topology Manager 支持四种策略:

策略 类名 行为
none nonePolicy 不做拓扑协调,仅调用 Allocate
best-effort bestEffortPolicy 尽力满足 NUMA 亲和性,即使无法满足也允许 Pod 运行
restricted restrictedPolicy 必须满足 NUMA 亲和性,否则拒绝 Pod
single-numa-node singleNumaNodePolicy 所有资源必须分配在同一 NUMA 节点上,否则拒绝 Pod

策略继承关系

nonePolicy → 直接调用 Allocate
bestEffortPolicy → Merge 返回 (hint, true)   // 总是 admit
restrictedPolicy → Merge 返回 (hint, admit)  // 有亲和性才 admit
singleNumaNodePolicy → 继承 restrictedPolicy + 额外限制只允许单 NUMA
3.7.3 Hint 合并算法

Merge() 是 Topology Manager 的核心算法,将多个 HintProvider 返回的 hints 合并为一个最优 hint:

1. filterProvidersHints():将所有 provider 的 hints 扁平化
   - 如果 provider 返回空 hints → 插入 {nil, true}(无偏好)
   - 如果 provider 返回空列表 → 插入 {nil, false}(无可能分配)

2. iterateAllProviderTopologyHints():遍历所有排列组合
   - 递归枚举所有 provider hints 的笛卡尔积
   - 对每个排列调用 mergePermutation()

3. mergePermutation():合并排列中的所有 hints
   - NUMANodeAffinity = 所有 hints 的 Bitwise AND
   - Preferred = 所有 hints 都 Preferred 时才为 true

4. 选择 bestHint:
   - Preferred 优先于 non-Preferred
   - 同等 Preferred 时,更窄的 NUMANodeAffinity 优先
   - Count()==0 的 hint 被忽略
3.7.4 Scope:Container vs Pod

Topology Manager 支持两种作用域:

Container Scope (scope_container.go):

  • 对 Pod 中的每个容器独立计算拓扑亲和性
  • 每个容器可以有不同的 NUMA 亲和性
  • Admit 流程:遍历所有容器 → accumulateProvidersHints()calculateAffinity()allocateAlignedResources()

Pod Scope (scope_pod.go):

  • 整个 Pod 共享一个拓扑亲和性
  • 使用 GetPodTopologyHints() 而非 GetTopologyHints()
  • Admit 流程:一次性计算 Pod 级 hint → 所有容器共享同一 hint → 逐一 Allocate
3.7.5 NUMA 节点数限制
const maxAllowableNUMANodes = 8

超过 8 个 NUMA 节点时,hint 排列组合将导致状态爆炸,Topology Manager 拒绝初始化。

3.8 内部容器生命周期接口

type InternalContainerLifecycle interface {
    PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error
    PostStopContainer(pod *v1.Pod, containerID containerID) error
}

type internalContainerLifecycleImpl struct {
    cpuManager    cpumanager.Manager
    memoryManager memorymanager.Manager
    topologyManager topologymanager.Manager
}

PreStartContainer 在容器启动前调用:

  • topologyManager.AddContainer()
  • cpuManager.AddContainer()
  • memoryManager.AddContainer()

四、Plugin Manager 深度解析

4.1 模块整体结构

4.1.1 核心接口与实现
type PluginManager interface {
    Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
    AddHandler(pluginType string, pluginHandler cache.PluginHandler)
}

type pluginManager struct {
    desiredStateOfWorldPopulator *pluginwatcher.Watcher
    reconciler          reconciler.Reconciler
    desiredStateOfWorld cache.DesiredStateOfWorld
    actualStateOfWorld  cache.ActualStateOfWorld
}
4.1.2 PluginHandler 接口
type PluginHandler interface {
    ValidatePlugin(pluginName string, endpoint string, versions []string) error
    RegisterPlugin(pluginName, endpoint string, versions []string) error
    DeRegisterPlugin(pluginName string)
}

状态机:

Socket Created → Validate → Register → Socket Deleted → DeRegister
                    ↑           |
                    |           | Socket created with same name
                    +-----------+  (ReRegistration)

关键约束:对于同一插件名,操作严格串行。例如:Register(“foo”) 未完成前,不会收到 DeRegister(“foo”) 或 Validate(“foo”, differentEndpoint) 调用。

4.2 Plugin Watcher 深度解析

4.2.1 核心数据结构
type Watcher struct {
    path                string                    // 监听的 Socket 目录
    fs                  utilfs.Filesystem
    fsWatcher           *fsnotify.Watcher
    desiredStateOfWorld cache.DesiredStateOfWorld
}
4.2.2 Start 流程
func (w *Watcher) Start(stopCh <-chan struct{}) error {
    // 1. 初始化:创建目录
    w.init()
    
    // 2. 创建 fsnotify Watcher
    fsWatcher, _ := fsnotify.NewWatcher()
    
    // 3. 遍历已有 Socket 文件(处理 Kubelet 重启后插件已在运行的情况)
    w.traversePluginDir(w.path)
    
    // 4. 启动事件监听 goroutine
    go func(fsWatcher *fsnotify.Watcher) {
        for {
            select {
            case event := <-fsWatcher.Events:
                if event.Op & fsnotify.Create:
                    w.handleCreateEvent(event)   // → AddOrUpdatePlugin to DSW
                elif event.Op & fsnotify.Remove:
                    w.handleDeleteEvent(event)   // → RemovePlugin from DSW
            case err := <-fsWatcher.Errors:
                // log error
            case <-stopCh:
                w.fsWatcher.Close()
                return
            }
        }
    }(fsWatcher)
}
4.2.3 handleCreateEvent 事件处理
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
    // 1. 忽略以 '.' 开头的文件
    if strings.HasPrefix(fi.Name(), ".") { return nil }
    
    // 2. 如果是普通文件:
    if !fi.IsDir() {
        // 2a. 检查是否为 Unix Domain Socket
        isSocket, _ := util.IsUnixDomainSocket(event.Name)
        if !isSocket { return nil }
        
        // 2b. 添加到 DSW(或更新时间戳)
        w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
    }
    
    // 3. 如果是目录:递归遍历
    return w.traversePluginDir(event.Name)
}

4.3 Plugin Manager Reconciler 深度解析

4.3.1 核心数据结构
type reconciler struct {
    operationExecutor   operationexecutor.OperationExecutor
    loopSleepDuration   time.Duration     // 默认 1s
    desiredStateOfWorld cache.DesiredStateOfWorld
    actualStateOfWorld  cache.ActualStateOfWorld
    handlers            map[string]cache.PluginHandler  // pluginType → handler
    sync.RWMutex
}
4.3.2 reconcile 流程
func (rc *reconciler) reconcile() {
    // 阶段一:反注册不再需要的插件(先于注册执行)
    for _, registeredPlugin := range rc.actualStateOfWorld.GetRegisteredPlugins() {
        unregisterPlugin := false
        
        // 情况A: 插件不在 DSW 中 → 需要反注册
        if !rc.desiredStateOfWorld.PluginExists(registeredPlugin.SocketPath) {
            unregisterPlugin = true
        } else {
            // 情况B: 插件在 DSW 中但时间戳不同 → 需要先反注册再重新注册
            for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
                if dswPlugin.SocketPath == registeredPlugin.SocketPath && 
                   dswPlugin.Timestamp != registeredPlugin.Timestamp {
                    unregisterPlugin = true
                    break
                }
            }
        }
        
        if unregisterPlugin {
            rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
        }
    }
    
    // 阶段二:注册需要的插件
    for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
        if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
            rc.operationExecutor.RegisterPlugin(
                pluginToRegister.SocketPath, 
                pluginToRegister.Timestamp, 
                rc.getHandlers(),      // 传入所有注册的 Handler
                rc.actualStateOfWorld)
        }
    }
}

4.4 OperationExecutor 与 OperationGenerator

4.4.1 Register 操作流程
1. OperationExecutor.RegisterPlugin(socketPath, timestamp, handlers, asw)
2. 通过 goroutinemap 确保同一 socket 不会并发操作
3. 启动 goroutine 执行:
   a. 通过 Socket 建立连接
   b. 获取 Plugin Info(名称、版本)
   c. 遍历 handlers:
      - handler.ValidatePlugin(pluginName, endpoint, versions)
      - handler.RegisterPlugin(pluginName, endpoint, versions)
   d. 成功后:asw.AddPlugin(pluginInfo)
4.4.2 Unregister 操作流程
1. OperationExecutor.UnregisterPlugin(registeredPlugin, asw)
2. 启动 goroutine 执行:
   a. 遍历 handlers:
      - handler.DeRegisterPlugin(pluginName)
   b. asw.RemovePlugin(socketPath)

五、辅助子系统

5.1 Checkpoint Manager

5.1.1 接口
type Checkpoint interface {
    MarshalCheckpoint() ([]byte, error)
    UnmarshalCheckpoint(blob []byte) error
    VerifyChecksum() error
}

type CheckpointManager interface {
    CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error
    GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error
    RemoveCheckpoint(checkpointKey string) error
    ListCheckpoints() ([]string, error)
}
5.1.2 实现
type impl struct {
    path  string
    store utilstore.Store   // FileStore 后端
    mutex sync.Mutex
}
  • CreateCheckpoint:序列化 → 写入文件
  • GetCheckpoint:读取文件 → 反序列化 → VerifyChecksum
  • 底层使用 utilstore.FileStore,文件存储在 --root-dir 下的 checkpoint 目录
  • Checksum 使用 checksum.Checksum 类型(通常为 CRC32 或类似算法)

使用场景

  • CPU Manager:cpu_manager_state 文件
  • Memory Manager:memory_manager_state 文件
  • Device Manager:kubelet_internal_checkpoint 文件

5.2 Certificate Manager

Certificate Manager 负责 kubelet 的证书轮换和引导:

  • certificate/transport.go:管理传输层证书
  • certificate/bootstrap/bootstrap.go:TLS 引导流程
  • certificate/kubelet.go:kubelet 证书管理

核心功能:

  1. Bootstrap:kubelet 启动时使用 bootstrap token 获取初始证书
  2. Rotation:证书接近过期时自动轮换
  3. Storage:将证书和密钥存储到指定目录

5.3 Cloud Resource Manager

type CloudRequestManager interface {
    Run(stopCh <-chan struct{})
    GetCloudNode(node *v1.Node) (*v1.Node, error)
}

负责与云提供商交互,获取节点信息(如云实例类型、区域等),更新 Node 对象。


六、模块间协作与集成

6.1 Pod 准入流程中的资源分配

当 Pod 被调度到节点后,Kubelet 通过 Pod Admit Handler 链进行准入检查:

1. TopologyManager.Admit() (如果启用)
   ├── 对每个容器调用 HintProvider.GetTopologyHints()
   │   ├── CPUManager.GetTopologyHints()
   │   ├── MemoryManager.GetTopologyHints()
   │   └── DeviceManager.GetTopologyHints()
   ├── Policy.Merge() 合并 hints
   ├── 如果 admit → 对每个容器调用 HintProvider.Allocate()
   │   ├── CPUManager.Allocate()  → 静态 CPU 分配
   │   ├── MemoryManager.Allocate() → NUMA 内存分配
   │   └── DeviceManager.Allocate() → 设备分配
   └── 如果 !admit → 拒绝 Pod

2. resourceAllocator.Admit() (Topology Manager 未启用时的 fallback)
   ├── DeviceManager.Allocate()
   ├── CPUManager.Allocate()
   └── MemoryManager.Allocate()

6.2 Container Manager 与 Plugin Manager 的集成

Container Manager 通过 GetPluginRegistrationHandler() 返回 Device Manager 的 Watcher Handler,由 Plugin Manager 在注册 Device Plugin 时调用:

PluginManager.AddHandler("device-plugin", deviceManager.GetWatcherHandler())

当 Device Plugin 注册时:

  1. Plugin Watcher 检测到 Socket
  2. Plugin Reconciler 触发 RegisterPlugin
  3. 调用 Device Manager 的 ValidatePlugin()RegisterPlugin()
  4. Device Manager 创建 gRPC endpoint,启动 ListAndWatch

6.3 Volume Manager 与 Container Manager 的交互

两者在 Pod 生命周期中协作但职责独立:

  • Volume Manager 在 SyncPod 之前完成卷的 Attach/Mount
  • Container Manager 在 Pod 准入阶段完成资源分配
  • Container Runtime 创建容器时同时需要:
    • Volume Manager 提供的挂载点信息
    • Container Manager 提供的设备、CPU、内存配置

6.4 Checkpoint 一致性

所有子 Manager 共享相同的 Checkpoint Manager 基础设施,但使用不同的 checkpoint 文件:

  • CPU Manager 写入 cpu_manager_state
  • Memory Manager 写入 memory_manager_state
  • Device Manager 写入 kubelet_internal_checkpoint

每个 checkpoint 文件包含:

  1. Policy 名称
  2. 分配映射(Pod → Container → 资源分配)
  3. 机器状态
  4. Checksum 校验

Kubelet 重启后,各 Manager 从 checkpoint 恢复状态,然后通过 reconcile 机制与运行时实际状态对齐。


七、Mermaid 图集

图1:Volume Manager 整体架构图

VolumeManager

Executor

Reconciler

Cache

Populator

Pod列表

AddPodToVolume

MarkRemountRequired

GetVolumesToMount

GetMountedVolumes

AttachVolume/DetachVolume

MountVolume/UnmountVolume

UnmountDevice

Plugin.Attach

Plugin.Mount

MarkVolumeAsAttached

MarkVolumeAsMounted

MarkVolumeAsUnmounted

MarkVolumeAsDetached

Run

WaitForAttachAndMount

volumeManager

DesiredStateOfWorldPopulator

DesiredStateOfWorld

ActualStateOfWorld

Reconciler

OperationExecutor

OperationGenerator

PodManager

VolumePlugin

Kubelet

图2:ASW/DSW Reconciliation 流程图

否: IsVolumeNotAttached

是, 需Remount

是, 需FSResize

Reconcile Loop Start

unmountVolumes

Pod在DSW中
引用此卷?

OperationExecutor.UnmountVolume

ASW.MarkVolumeAsUnmounted

mountAttachVolumes

卷已附加?

AD Controller
负责?

VerifyControllerAttachedVolume

AttachVolume

卷已挂载?

MountVolume

ExpandInUseVolume

unmountDetachDevices

卷在DSW中?

设备已全局挂载?

UnmountDevice

AD Controller负责?

MarkVolumeAsDetached

DetachVolume

Sleep 100ms

图3:Volume Plugin 接口层次图

VolumePlugin

+Init(host VolumeHost)

+GetPluginName() : string

+GetVolumeSpecMapper() : VolumeSpecMapper

+ConstructVolumeSpec() : Spec

+SupportsMount() : bool

+SupportsBlock() : bool

AttachableVolumePlugin

+NewAttacher() : Attacher

+NewDetacher() : Detacher

+CanAttach() : bool

DeviceMountableVolumePlugin

+NewDeviceMounter() : DeviceMounter

+NewDeviceUnmounter() : DeviceUnmounter

+GetDeviceMountPath() : string

MountableVolumePlugin

+NewMounter() : Mounter

+GetVolumeName() : string

+CanMount() : bool

BlockVolumePlugin

+NewBlockVolumeMapper() : BlockVolumeMapper

+NewBlockVolumeUnmapper() : BlockVolumeUnmapper

ExpandableVolumePlugin

+ExpandVolumeDevice() : error

+RequiresFSResize() : bool

Attacher

+Attach(spec, nodeName) : string

+VolumesAreAttached() : map

+WaitForAttach() : string

Detacher

+Detach(volumeName, nodeName)

+WaitForDetach()

Mounter

+SetUp(mountArgs)

+GetPath() : string

+CanMount() : bool

DeviceMounter

+MountDevice(spec, devicePath, deviceMountPath)

+GetDeviceMountPath() : string

图4:Container Manager 架构图

ContainerManager

States

Policies

SubManagers

HintProvider

HintProvider

HintProvider

Start

GetResources

GetAllocateResourcesPodAdmitHandler

containerManagerImpl

CgroupManager

QOSContainerManager

CPUManager

MemoryManager

DeviceManager

TopologyManager

CPU Policy: none/static

Memory Policy: none/static

Topology Policy: none/best-effort/restricted/single-numa-node

CPU State + Checkpoint

Memory State + Checkpoint

Device State + Checkpoint

Topology Hints Store

Kubelet

图5:cgroup 层次结构图

System

BestEffort

Burstable

Guaranteed

/ (cgroup root)

kubepods/

kubepods.slice/

pod-abc123/

pod-def456/

burstable/

burstable/pod-ghi789/

burstable/pod-jkl012/

besteffort/

besteffort/pod-mno345/

system.slice/

kubelet.service/

docker.service/

图6:CPU Manager Static Policy 流程图

Reconcile循环

获取活跃Pod列表

检查已分配容器
是否仍在运行

清理过期分配
removeStaleState

更新cpuset cgroup

CPUManager.Allocate

Pod QoS =
Guaranteed?

跳过, 不做静态分配

CPU Request ==
CPU Limit && 整数?

容器已有
CPU分配?

返回, 幂等

获取 TopologyManager NUMA亲和性

hint有NUMA
亲和性?

从指定NUMA节点的
freeCPUs分配

从defaultCPUs分配

执行CPU分配

更新State:
- 从freeCPUs移除
- 更新defaultCPUs
- 记录assignment

写入Checkpoint

完成

图7:Memory Manager Static Policy 流程图

MemoryManager.Allocate

Pod QoS =
Guaranteed?

跳过

容器已有
MemoryBlocks?

返回

计算请求的内存资源
ResourceMemory + Hugepages

获取TopologyManager亲和性

hint.NUMANodeAffinity
!= nil?

计算默认hint
getDefaultHint

亲和性满足
请求?

扩展hint
extendTopologyManagerHint

在NUMA节点上
分配内存块

更新MachineState:
Free → Reserved
NumberOfAssignments++

SetMemoryBlocks
记录分配关系

写入Checkpoint

完成

图8:Device Plugin 交互时序图

Endpoint Device Manager Plugin Manager Plugin Watcher Device Plugin Endpoint Device Manager Plugin Manager Plugin Watcher Device Plugin 注册阶段 分配阶段 反注册阶段 创建Socket文件 fsnotify检测到Create事件 AddOrUpdatePlugin to DSW Reconciler检测到DSW与ASW不一致 ValidatePlugin(name, endpoint, versions) GetInfo() gRPC调用 PluginInfo(name, version, endpoint) RegisterPlugin(name, endpoint, versions) 建立gRPC连接 GetDevicePluginOptions() 创建endpoint ListAndWatch() 启动流 Stream: DeviceList[device1, device2...] callback(resourceName, devices) 更新设备缓存和allocatable Allocate(pod, container) endpoint.allocate(deviceIDs) Allocate() gRPC调用 AllocateResponse[devices, mounts, envs] ContainerResponse 组装RunContainerOptions 删除Socket文件 fsnotify检测到Remove事件 RemovePlugin from DSW Reconciler检测到ASW有DSW无 DeRegisterPlugin(name) stop()

图9:Plugin Manager 注册流程图

Create

Remove

否, 时间戳不同

成功

成功

失败

Socket文件创建/删除

事件类型

是Unix
Domain Socket?

忽略非Socket文件

AddOrUpdatePlugin to DSW
更新时间戳

RemovePlugin from DSW

Reconciler循环 1s间隔

阶段一: 反注册

插件在ASW中
但不在DSW?

UnregisterPlugin

阶段二: 注册

插件在DSW中
但不在ASW?

RegisterPlugin

handler.ValidatePlugin

handler.RegisterPlugin

ASW.AddPlugin

忽略

handler.DeRegisterPlugin

ASW.RemovePlugin

图10:Checkpoint 持久化流程图

FileSystem

CkptMgr

DevMgr

MemMgr

CPUMgr

MarshalCheckpoint

CreateCheckpoint

Write

MarshalCheckpoint

CreateCheckpoint

Write

MarshalCheckpoint

CreateCheckpoint

Write

Read

GetCheckpoint

UnmarshalCheckpoint
+ VerifyChecksum

CPU Manager State

CPUManagerCheckpoint

Memory Manager State

MemoryManagerCheckpoint

Device Manager State

DeviceManagerCheckpoint

CheckpointManager
+ FileStore

cpu_manager_state

memory_manager_state

kubelet_internal_checkpoint

图11:Topology Manager 准入流程图

none

best-effort/restricted/
single-numa-node

container

pod

best-effort: 总是admit

restricted: 有亲和性才admit

single-numa-node: 单NUMA才admit

拒绝

TopologyManager.Admit

Policy类型

直接调用
所有HintProvider.Allocate

Admit Pod

Scope类型

遍历所有容器

Pod整体计算

accumulateProvidersHints
CPUManager.GetTopologyHints
MemoryManager.GetTopologyHints

accumulateProvidersHints
CPUManager.GetPodTopologyHints
MemoryManager.GetPodTopologyHints

Policy.Merge

filterProvidersHints
扁平化所有hints

iterateAllProviderTopologyHints
笛卡尔积遍历

mergePermutation
Bitwise AND合并NUMA亲和性

选择bestHint:
1. Preferred优先
2. 更窄亲和性优先

Policy判断

setTopologyHints

allocateAlignedResources
所有HintProvider.Allocate

TopologyAffinityError
Pod拒绝

图12:Volume Attach/Detach 完整流程图

Container Runtime Volume Plugin OperationExecutor Reconciler Actual State Desired State Populator Volume Manager Kubelet Scheduler API Server Container Runtime Volume Plugin OperationExecutor Reconciler Actual State Desired State Populator Volume Manager Kubelet Scheduler API Server Pod调度到节点 WaitForAttachAndMount alt [Kubelet负责Attach] [AD Controller负责] alt [卷未附加] alt [需要DeviceMount] alt [卷已附加但未挂载] loop [Reconcile循环 100ms] Pod删除 alt [Pod不再需要此卷] alt [设备已全局挂载] [需要Detach] alt [卷不再需要] loop [Reconcile循环] Pod绑定到Node Pod更新 SyncPod findAndAddNewPods createVolumeSpec (PVC→PV解析) AddPodToVolume WaitForAttachAndMount(pod) GetVolumesToMount PodExistsInVolume AttachVolume Attacher.Attach devicePath MarkVolumeAsAttached VerifyControllerAttachedVolume 获取Node.Status.VolumesAttached MarkVolumeAsAttached MountVolume DeviceMounter.MountDevice MarkDeviceAsMounted Mounter.SetUp MarkVolumeAsMounted 所有卷已挂载 CreateContainer (挂载点已就绪) PodRemoved DeletePodFromVolume GetAllMountedVolumes PodExistsInVolume UnmountVolume Mounter.TearDown MarkVolumeAsUnmounted GetUnmountedVolumes VolumeExists UnmountDevice DeviceUnmounter.UnmountDevice DetachVolume Detacher.Detach MarkVolumeAsDetached

附录A:关键数据流总结

Volume Manager 数据流

PodManager.GetPods() 
  → Populator.findAndAddNewPods() 
    → createVolumeSpec() [PVC→PV解析, CSI迁移]
    → DSW.AddPodToVolume()
  → Populator.findAndRemoveDeletedPods()
    → DSW.DeletePodFromVolume()

Reconciler.reconcile() [每100ms]
  → ASW.GetAllMountedVolumes() vs DSW.PodExistsInVolume()
    → OE.UnmountVolume()
  → DSW.GetVolumesToMount() vs ASW.PodExistsInVolume()
    → OE.AttachVolume() / VerifyControllerAttachedVolume()
    → OE.MountVolume() / ExpandInUseVolume()
  → ASW.GetUnmountedVolumes() vs DSW.VolumeExists()
    → OE.UnmountDevice() / DetachVolume()

Container Manager 数据流

Kubelet Pod Admit:
  → TopologyManager.Admit()
    → [CPU/Memory/Device]Manager.GetTopologyHints()
    → Policy.Merge() → bestHint
    → [CPU/Memory/Device]Manager.Allocate()
  
Kubelet SyncPod:
  → ContainerManager.GetResources()
    → DeviceManager.GetDeviceRunContainerOptions()
  → InternalContainerLifecycle.PreStartContainer()
    → TopologyManager.AddContainer()
    → CPUManager.AddContainer()
    → MemoryManager.AddContainer()

Container Removal:
  → CPUManager.RemoveContainer()
  → MemoryManager.RemoveContainer()
  → TopologyManager.RemoveContainer()

Plugin Manager 数据流

fsnotify Event (Socket Create/Delete)
  → PluginWatcher.handleCreateEvent/handleDeleteEvent
  → DSW.AddOrUpdatePlugin / RemovePlugin

Reconciler.reconcile() [每1s]
  → ASW.GetRegisteredPlugins() vs DSW.PluginExists()
    → OE.UnregisterPlugin() → handler.DeRegisterPlugin()
  → DSW.GetPluginsToRegister() vs ASW.PluginExistsWithCorrectTimestamp()
    → OE.RegisterPlugin() → handler.ValidatePlugin() → handler.RegisterPlugin()

附录B:配置参数与Feature Gate影响

Feature Gate 影响的模块 行为变化
CPUManager CPU Manager 启用 CPU 亲和性管理
MemoryManager Memory Manager 启用 NUMA 内存分配
TopologyManager Topology Manager 启用拓扑感知资源协调
DevicePlugins Device Manager 启用 Device Plugin 支持
ExpandInUsePersistentVolumes Volume Manager Populator 启用在线卷扩容
BlockVolume Volume Manager 支持块卷模式
GenericEphemeralVolume Volume Manager Populator 支持 Generic Ephemeral 内联卷
StorageObjectInUseProtection Volume Manager Populator PVC 删除保护
CSIMigration Volume Manager CSI 迁移支持
配置参数 影响的模块 默认值
--cgroups-per-qos Container Manager true
--cgroup-driver Container Manager cgroupfs
--cpu-manager-policy CPU Manager none
--cpu-manager-reconcile-period CPU Manager 10s
--memory-manager-policy Memory Manager none
--topology-manager-policy Topology Manager none
--topology-manager-scope Topology Manager container
--fail-swap-on Container Manager true
--keep-terminated-pod-volumes Volume Manager false

附录C:错误处理与恢复机制

Volume Manager 错误处理

  1. 操作冲突:通过 goroutinemapnestedpendingoperations 防止同一卷的并发操作
  2. 指数退避:操作失败后通过 exponentialbackoff 逐渐增加重试间隔
  3. Pod 错误聚合:DSW 通过 AddErrorToPod / PopPodErrors 聚合 Pod 级别的卷错误
  4. Kubelet 重启恢复syncStates() 从磁盘扫描恢复卷状态

Container Manager 错误处理

  1. Checkpoint 校验:启动时通过 VerifyChecksum() 验证 checkpoint 完整性
  2. Stale State 清理:各子 Manager 定期执行 removeStaleState(),清理已终止容器的分配
  3. Pod 准入拒绝:Topology Manager 和 resourceAllocator 在分配失败时拒绝 Pod
  4. Cgroup 重建setupNode() 在启动时重建 QoS cgroup 层次

Plugin Manager 错误处理

  1. 注册失败:ValidatePlugin/RegisterPlugin 失败时不更新 ASW,下次循环重试
  2. ReRegistration:Socket 文件重建时通过时间戳检测,先反注册再注册
  3. Socket 消失:fsnotify 的 Remove 事件触发 DSW 移除,Reconciler 执行反注册

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐