【kubernetes v1.21】(kubelet 4)Kubelet Volume Manager、Container Manager 与 Plugin System
Part 4: Kubelet Volume Manager、Container Manager 与 Plugin System 超深度分析
Part 4: Kubelet Volume Manager、Container Manager 与 Plugin System 超深度分析
目录
- 一、模块定位
- 二、Volume Manager 深度解析
- 三、Container Manager 深度解析
- 四、Plugin Manager 深度解析
- 五、辅助子系统
- 六、模块间协作与集成
- 七、Mermaid 图集
一、模块定位
1.1 Volume Manager — 存储生命周期编排器
业务职责:Volume Manager 是 Kubelet 中负责节点级别卷(Volume)生命周期管理的核心子系统。它监控调度到本节点的 Pod,确定哪些卷需要被挂载(Mount)/卸载(Unmount)、附加(Attach)/分离(Detach),并通过异步循环驱动实际操作执行。
源码位置:pkg/kubelet/volumemanager/
核心设计理念:
- ASW/DSW 双缓存模型:Desired State of World(期望状态)+ Actual State of World(实际状态),通过 Reconciler 持续比对并驱动收敛
- 声明式驱动:Pod 声明需要什么卷 → Populator 更新 DSW → Reconciler 发现差异 → OperationExecutor 执行操作 → ASW 更新
- CSI 迁移兼容:通过 csiMigratedPluginManager 和 intreeToCSITranslator 支持 in-tree 插件向 CSI 平滑迁移
1.2 Container Manager — 资源分配与隔离指挥中心
业务职责:Container Manager(CM)是 Kubelet 中管理容器资源分配与隔离的核心子系统,涵盖 cgroup 管理、CPU 亲和性、内存亲和性、设备分配、NUMA 拓扑协调等。它确保 Pod 的资源请求在节点上得到正确实施,包括 QoS 分级、Node Allocatable 约束、以及硬件拓扑感知的资源分配。
源码位置:pkg/kubelet/cm/
核心设计理念:
- 层级化管理:Container Manager → 子 Manager(CPU/Memory/Device/Topology)→ Policy → State
- Topology-First 策略:Topology Manager 协调所有 Hint Provider(CPU/Memory/Device),在 Pod 准入阶段统一计算 NUMA 亲和性
- Checkpoint 持久化:各子 Manager 的分配状态通过 Checkpoint Manager 持久化到磁盘,支持 Kubelet 重启后恢复
1.3 Plugin Manager — 插件注册发现协调器
业务职责:Plugin Manager 负责监听节点上插件 Socket 文件的创建与删除,协调插件的注册(Register)与反注册(Deregister)流程。它是 Device Plugin、CSI Plugin 等第三方扩展与 Kubelet 交互的桥梁。
源码位置:pkg/kubelet/pluginmanager/
核心设计理念:
- 文件系统驱动:通过 fsnotify 监听 Socket 目录,将文件创建/删除事件转化为插件注册/反注册意图
- ASW/DSW 模型:与 Volume Manager 采用相同的 Desired/Actual State 模式
- Handler 抽象:通过 PluginHandler 接口将具体的注册逻辑委托给消费者(如 Device Manager)
二、Volume Manager 深度解析
2.1 模块整体结构
2.1.1 核心类型与接口
// VolumeManager 顶层接口
type VolumeManager interface {
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
WaitForAttachAndMount(pod *v1.Pod) error
GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
GetVolumesInUse() []v1.UniqueVolumeName
ReconcilerStatesHasBeenSynced() bool
VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}
// volumeManager 具体实现
type volumeManager struct {
kubeClient clientset.Interface
volumePluginMgr *volume.VolumePluginMgr
desiredStateOfWorld cache.DesiredStateOfWorld // 期望状态缓存
actualStateOfWorld cache.ActualStateOfWorld // 实际状态缓存
operationExecutor operationexecutor.OperationExecutor // 操作执行器
reconciler reconciler.Reconciler // 协调器
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator // 期望状态填充器
csiMigratedPluginManager csimigration.PluginManager
intreeToCSITranslator csimigration.InTreeToCSITranslator
}
2.1.2 关键常量
| 常量 | 值 | 含义 |
|---|---|---|
reconcilerLoopSleepPeriod |
100ms | Reconciler 循环间隔 |
desiredStateOfWorldPopulatorLoopSleepPeriod |
100ms | Populator 循环间隔 |
desiredStateOfWorldPopulatorGetPodStatusRetryDuration |
2s | Pod 状态重试间隔 |
podAttachAndMountTimeout |
2m3s | Pod 卷挂载超时 |
podAttachAndMountRetryInterval |
300ms | Pod 卷挂载重试间隔 |
waitForAttachTimeout |
10m | 等待 Attach 超时 |
2.1.3 依赖注入关系
Volume Manager 的创建通过 NewVolumeManager() 工厂函数完成,注入以下依赖:
kubeClient:API Server 通信volumePluginMgr:卷插件管理器(必须预初始化)mounter:文件系统挂载接口hostutil:主机工具接口blockVolumePathHandler:块卷路径处理
内部组件的依赖关系为:
volumeManager
├── desiredStateOfWorld (依赖 volumePluginMgr)
├── actualStateOfWorld (依赖 nodeName, volumePluginMgr)
├── operationExecutor
│ └── operationGenerator (依赖 kubeClient, volumePluginMgr, recorder, blockVolumePathHandler)
├── reconciler (依赖所有上述组件 + mounter, hostutil)
└── desiredStateOfWorldPopulator (依赖 kubeClient, podManager, podStatusProvider, DSW, ASW, containerRuntime, csiMigratedPluginManager)
2.2 Desired State of World (DSW) 深度解析
DSW 是 Volume Manager 的"意图存储",记录"哪些 Pod 需要哪些卷"。
2.2.1 接口定义
type DesiredStateOfWorld interface {
AddPodToVolume(podName volumetypes.UniquePodName, pod *v1.Pod,
volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGidValue string) (v1.UniqueVolumeName, error)
DeletePodFromVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName)
MarkVolumesReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
AddErrorToPod(podName volumetypes.UniquePodName, err string)
PopPodErrors(podName volumetypes.UniquePodName) []string
GetVolumesToMount() []VolumeToMount
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, error)
VolumeExistsWithSpecName(podName volumetypes.UniquePodName, volumeSpecName string) bool
VolumeExists(volumeName v1.UniqueVolumeName) bool
MarkVolumeAttachability(volumeName v1.UniqueVolumeName, attachable bool)
GetPodsWithErrors() []volumetypes.UniquePodName
}
2.2.2 核心数据结构
DSW 内部维护两个关键映射:
volumesToMount:map[v1.UniqueVolumeName]volumeToMount— 卷名到卷挂载信息的映射podsToMarkUnmounted: 已标记为卸载的 Pod 集合
每个 volumeToMount 条目包含:
volumeName: 唯一卷名pod: Pod 对象引用volumeSpec: 卷规格pluginIsAttachable: 插件是否支持 AttachreportedInUse: 是否已报告为使用中desiredSizeLimit: 期望的卷大小限制
2.2.3 AddPodToVolume 流程
- 根据
volumeSpec和volumePluginMgr查找对应插件 - 判断插件是否支持 Attach 操作
- 根据 PV/PVC 信息生成
UniqueVolumeName - 如果卷已存在于 DSW 中,仅添加 Pod 到该卷的
podsToMount映射 - 如果卷不存在,创建新的
volumeToMount条目 - 所有操作在写锁保护下进行
2.3 Actual State of World (ASW) 深度解析
ASW 是 Volume Manager 的"事实存储",记录"哪些卷实际上已经附加/挂载到本节点"。
2.3.1 接口定义
type ActualStateOfWorld interface {
MarkVolumeAsAttached(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec,
nodeName types.NodeName, devicePath string) error
MarkVolumeAsMounted(markVolumeOpts MarkVolumeOpts) error
MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
MarkVolumeAsDetached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath string, deviceMountPath string) error
MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
MarkRemountRequired(podName volumetypes.UniquePodName)
MarkFSResizeRequired(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName)
GetMountedVolumesForPod(podName volumetypes.UniquePodName) []MountedVolume
GetAllMountedVolumes() []MountedVolume
GetUnmountedVolumes() []AttachedVolume
GetAttachedVolumes() []AttachedVolume
GetMountedVolumes() []MountedVolume
GetVolumesInUse() []v1.UniqueVolumeName
VolumeExists(volumeName v1.UniqueVolumeName) bool
VolumeExistsWithSpecName(podName volumetypes.UniquePodName, volumeSpecName string) bool
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, error)
}
2.3.2 核心数据结构
ASW 内部维护三个关键映射:
attachedVolumes:map[v1.UniqueVolumeName]attachedVolume— 已附加的卷mountedVolumes: 从attachedVolume中派生,记录卷到 Pod 的挂载关系deviceMountPaths:map[v1.UniqueVolumeName]deviceMountPath— 设备挂载路径
每个 attachedVolume 包含:
volumeName: 唯一卷名volumeSpec: 卷规格nodeName: 节点名devicePath: 设备路径deviceMountPath: 设备挂载路径pluginIsAttachable: 是否支持 AttachdeviceMayBeMounted: 设备可能已挂载标记mountedPods:map[volumetypes.UniquePodName]mountedPod— 已挂载到此卷的 Pod
2.4 DesiredStateOfWorldPopulator 深度解析
Populator 是 DSW 的数据来源,它定期从 PodManager 获取 Pod 列表,将卷信息注入 DSW。
2.4.1 核心数据结构
type desiredStateOfWorldPopulator struct {
kubeClient clientset.Interface
loopSleepDuration time.Duration
getPodStatusRetryDuration time.Duration
podManager pod.Manager
podStatusProvider status.PodStatusProvider
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
pods processedPods
kubeContainerRuntime kubecontainer.Runtime
timeOfLastGetPodStatus time.Time
keepTerminatedPodVolumes bool
hasAddedPods bool
hasAddedPodsLock sync.RWMutex
csiMigratedPluginManager csimigration.PluginManager
intreeToCSITranslator csimigration.InTreeToCSITranslator
volumePluginMgr *volume.VolumePluginMgr
}
type processedPods struct {
processedPods map[volumetypes.UniquePodName]bool
sync.RWMutex
}
2.4.2 运行流程
Populator 的 Run() 方法启动两个阶段:
阶段一:等待所有 Source 就绪
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
done := sourcesReady.AllReady()
dswp.populatorLoop()
return done, nil
}, stopCh)
dswp.hasAddedPods = true
阶段二:持续循环
wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
每个 populatorLoop() 执行两个操作:
findAndAddNewPods():遍历 PodManager 中的所有 Pod,将新 Pod 的卷添加到 DSWfindAndRemoveDeletedPods():检查 DSW 中的 Pod 是否仍然存在,删除已终止 Pod 的卷
2.4.3 processPodVolumes 逐行解析
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
pod *v1.Pod,
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
processedVolumesForFSResize sets.String) {
// 1. nil 检查
if pod == nil { return }
// 2. 去重:如果 Pod 已处理过,跳过
uniquePodName := util.GetUniquePodName(pod)
if dswp.podPreviouslyProcessed(uniquePodName) { return }
// 3. 获取 Pod 的所有卷名(mounts 和 devices 分别处理)
allVolumesAdded := true
mounts, devices := util.GetPodVolumeNames(pod)
// 4. 遍历 Pod Spec 中的每个 Volume
for _, podVolume := range pod.Spec.Volumes {
// 4a. 跳过 Pod 未使用的卷
if !mounts.Has(podVolume.Name) && !devices.Has(podVolume.Name) { continue }
// 4b. 创建 VolumeSpec(处理 PVC 解引用、CSI 迁移等)
pvc, volumeSpec, volumeGidValue, err := dswp.createVolumeSpec(podVolume, pod, mounts, devices)
if err != nil {
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
allVolumesAdded = false
continue
}
// 4c. 添加到 DSW
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
// 4d. 如果启用了在线扩容,检查文件系统是否需要 Resize
if expandInUsePV {
dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec,
uniquePodName, mountedVolumesForPod, processedVolumesForFSResize)
}
}
// 5. 标记 Pod 已处理
if allVolumesAdded {
dswp.markPodProcessed(uniquePodName)
dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
dswp.desiredStateOfWorld.PopPodErrors(uniquePodName)
}
}
2.4.4 createVolumeSpec 深度解析
这是 Populator 最复杂的方法之一,负责将 Pod Spec 中的 Volume 声明解析为具体的 VolumeSpec:
- PVC 卷:调用
getPVCExtractPV()从 API Server 获取 PVC → 解析出 PV → 调用getPVSpec()获取 PV Spec- 验证 PVC 是否在删除中(StorageObjectInUseProtection feature gate)
- 验证 PVC 是否已 Bound
- 检查卷模式(Block vs Filesystem)与 Pod 中的使用方式是否匹配
- Ephemeral 内联卷:如果 feature gate 启用,将其视为 PVC 处理
- In-tree 卷:直接创建 VolumeSpec
- CSI 迁移:对于所有可迁移的卷,调用
TranslateInTreeSpecToCSI()将 in-tree Spec 翻译为 CSI Spec
2.5 Reconciler 深度解析
Reconciler 是 Volume Manager 的执行引擎,它持续比对 DSW 和 ASW,驱动操作执行。
2.5.1 核心数据结构
type reconciler struct {
kubeClient clientset.Interface
controllerAttachDetachEnabled bool // AD Controller 是否接管 Attach/Detach
loopSleepDuration time.Duration
waitForAttachTimeout time.Duration
nodeName types.NodeName
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
populatorHasAddedPods func() bool
operationExecutor operationexecutor.OperationExecutor
mounter mount.Interface
hostutil hostutil.HostUtils
volumePluginMgr *volumepkg.VolumePluginMgr
kubeletPodsDir string
timeOfLastSync time.Time // 最后一次 sync 时间
}
2.5.2 reconciliationLoopFunc
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile() // 主协调逻辑
// 当 Populator 已完成 Pod 添加且尚未同步过状态时,执行一次性 sync
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
rc.sync() // 从磁盘扫描重建卷状态
}
}
}
2.5.3 reconcile() 三阶段流程
阶段一:卸载不再需要的卷(unmountVolumes)
func (rc *reconciler) unmountVolumes() {
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
// 检查 Pod 是否仍在 DSW 中引用此卷
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// 卷已挂载但不再需要 → 触发 UnmountVolume
rc.operationExecutor.UnmountVolume(mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
}
}
}
关键点:卸载在挂载之前执行,确保一个卷从旧 Pod 卸载后才挂载到新 Pod。
阶段二:挂载/附加需要的卷(mountAttachVolumes)
func (rc *reconciler) mountAttachVolumes() {
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
if cache.IsVolumeNotAttachedError(err) {
// 卷未附加 → 分两种情况
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
// 情况A: AD Controller 负责,或插件不支持 Attach → 验证 Controller 是否已附加
rc.operationExecutor.VerifyControllerAttachedVolume(...)
} else {
// 情况B: Kubelet 自己负责 Attach → 执行 AttachVolume
rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
}
} else if !volMounted || cache.IsRemountRequiredError(err) {
// 卷已附加但未挂载,或需要重新挂载 → 执行 MountVolume
rc.operationExecutor.MountVolume(...)
} else if cache.IsFSResizeRequiredError(err) {
// 文件系统需要在线扩容 → 执行 ExpandInUseVolume
rc.operationExecutor.ExpandInUseVolume(...)
}
}
}
阶段三:分离/卸载设备(unmountDetachDevices)
func (rc *reconciler) unmountDetachDevices() {
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// 卷已附加但无 Pod 挂载,且不在 DSW 中
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
!rc.operationExecutor.IsOperationPending(...) {
if attachedVolume.DeviceMayBeMounted() {
// 设备全局挂载中 → 先 UnmountDevice
rc.operationExecutor.UnmountDevice(...)
} else {
// 设备未全局挂载 → Detach 或标记已分离
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
rc.actualStateOfWorld.MarkVolumeAsDetached(...)
} else {
rc.operationExecutor.DetachVolume(...)
}
}
}
}
}
2.5.4 syncStates — Kubelet 重启后卷状态重建
syncStates() 是 Reconciler 中最复杂的部分,在 Kubelet 重启后执行一次,从磁盘扫描 Pod 目录重建卷状态:
- 扫描
/pods/{podUID}/volume/{pluginName}/{volumeName}和/pods/{podUID}/volumeDevices/{pluginName}/{volumeName} - 对每个发现的卷:
- 如果 ASW 中已存在 → 跳过
- 尝试
reconstructVolume():通过 plugin 重建 VolumeSpec、Mounter/BlockVolumeMapper - 如果重建成功且卷在 DSW 中 → 标记为 InUse(避免误卸载)
- 如果重建成功但不在 DSW 中 → 添加到 ASW(孤儿卷恢复)
- 如果重建失败且不在 DSW 中 →
cleanupMounts()清理
- 调用
updateDevicePath()从 Node Status 的VolumesAttached获取设备路径
2.6 OperationExecutor
OperationExecutor 是卷操作的异步执行器,通过 goroutinemap 和 nestedpendingoperations 确保同一卷不会同时执行多个操作。关键操作:
AttachVolume:调用 Volume Plugin 的 Attacher.Attach()DetachVolume:调用 Detacher.Detach()MountVolume:调用 Mounter.SetUp() 或 DeviceMounter.MountDevice()UnmountVolume:调用 Mounter.TearDown()UnmountDevice:调用 DeviceMounter.UnmountDevice()VerifyControllerAttachedVolume:轮询 Node Status 等待 AD Controller 完成附加ExpandInUseVolume:在线扩容ReconstructVolumeOperation:从磁盘重建卷
三、Container Manager 深度解析
3.1 模块整体结构
3.1.1 核心接口
// ContainerManager 顶层接口
type ContainerManager interface {
Start(node *v1.Node, activePods ActivePodsFunc, sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider, runtimeService internalapi.RuntimeService) error
NewPodContainerManager() PodContainerManager
GetPodCgroupRoot() string
GetMountedSubsystems() *CgroupSubsystems
GetQOSContainersInfo() QOSContainersInfo
UpdateQOSCgroups() error
Status() Status
GetNodeConfig() NodeConfig
GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error)
GetPluginRegistrationHandler() cache.PluginHandler
UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler
InternalContainerLifecycle() InternalContainerLifecycle
SystemCgroupsLimit() v1.ResourceList
}
3.1.2 containerManagerImpl 结构
type containerManagerImpl struct {
sync.RWMutex
cadvisorInterface cadvisor.Interface
mountUtil mount.Interface
NodeConfig
status Status
systemContainers []*systemContainer
periodicTasks []func()
subsystems *CgroupSubsystems
nodeInfo *v1.Node
cgroupManager CgroupManager
capacity v1.ResourceList
internalCapacity v1.ResourceList
cgroupRoot CgroupName
recorder record.EventRecorder
qosContainerManager QOSContainerManager
deviceManager devicemanager.Manager
cpuManager cpumanager.Manager
memoryManager memorymanager.Manager
topologyManager topologymanager.Manager
}
3.1.3 NodeConfig 关键配置
type NodeConfig struct {
CgroupRoot string
CgroupsPerQOS bool
CgroupDriver string
SystemCgroupsName string
KubeletCgroupsName string
ContainerRuntime string
ExperimentalCPUManagerPolicy string
ExperimentalCPUManagerReconcilePeriod time.Duration
ExperimentalMemoryManagerPolicy string
ExperimentalMemoryManagerReservedMemory []kubeletconfig.MemoryReservation
ExperimentalTopologyManagerPolicy string
ExperimentalTopologyManagerScope string
ReservedSystemCPUs string
NodeAllocatableConfig
KubeletRootDir string
}
3.2 初始化流程深度解析
NewContainerManager() 执行以下初始化步骤:
1. validateSystemRequirements() → 检查 cgroup 子系统是否挂载(cpu, cpuacct, cpuset, memory)
2. failSwapOn 检查 → 读取 /proc/swaps 验证 swap 未开启
3. 获取 MachineInfo → 计算节点 Capacity
4. 解析 CgroupRoot → 确保根 cgroup 存在
5. 初始化 QOSContainerManager
6. 初始化 TopologyManager(如果 feature gate 启用)
7. 初始化 DeviceManager → 注册为 HintProvider
8. 初始化 CPUManager → 注册为 HintProvider
9. 初始化 MemoryManager → 注册为 HintProvider
10. 返回 containerManagerImpl 实例
Start() 方法执行:
1. 启动 CPUManager(从 runtime 获取初始容器映射)
2. 启动 MemoryManager
3. 缓存 NodeInfo
4. 验证 Node Allocatable 配置
5. setupNode() → 设置 QoS cgroup、系统容器、内核参数
6. 启动 DeviceManager
3.3 CgroupManager 深度解析
3.3.1 接口定义
type CgroupManager interface {
Exists(name CgroupName) bool
Create(name CgroupName, resource *ResourceConfig) error
Destroy(name CgroupName) error
Update(name CgroupName, resource *ResourceConfig) error
Name(name CgroupName) string
Pids(name CgroupName) []int
ReduceCPULimits(cgroupName CgroupName) error
GetResourceStats(name CgroupName) (*ResourceStats, error)
}
3.3.2 CgroupName 与路径映射
CgroupName 是一个字符串切片,表示 cgroup 层级路径。两种驱动方式有不同的路径映射:
cgroupfs 驱动:
CgroupName{"kubepods", "burstable", "pod1234"} → /kubepods/burstable/pod1234/
systemd 驱动:
CgroupName{"kubepods", "burstable", "pod1234"} → /kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod1234.slice/
systemd 驱动的转换规则:
- 每个组件添加
.slice后缀 - 层级间用
-连接 - 组件名中的
-转义为_(escapeSystemdCgroupName) - 使用
cgroupsystemd.ExpandSlice()生成完整路径
3.3.3 cgroup 层次结构
当 CgroupsPerQOS=true 时,cgroup 层次如下:
/kubepods.slice/ (cgroupRoot)
├── /kubepods-pod1234.slice/ (Pod 级 cgroup, QoS=Guaranteed)
├── /kubepods-burstable.slice/ (Burstable QoS 级)
│ ├── /kubepods-burstable-pod5678.slice/ (Pod 级 cgroup)
│ └── ...
├── /kubepods-besteffort.slice/ (BestEffort QoS 级)
│ ├── /kubepods-besteffort-pod9012.slice/
│ └── ...
└── /kubepods-pod1234.slice/ (Guaranteed Pod 直接在 kubepods 下)
3.4 CPU Manager 深度解析
3.4.1 接口定义
type Manager interface {
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider, runtimeService internalapi.RuntimeService,
initialContainers containermap.ContainerMap) error
Allocate(pod *v1.Pod, container *v1.Container) error
RemoveContainer(containerID string) error
State() state.Reader
GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint
GetHintPorviderName() string
GetAllocatableCPUs() cpuset.CPUSet
}
3.4.2 Static Policy 核心逻辑
Static Policy 是 CPU Manager 的核心策略,仅对 Guaranteed QoS 的 Pod 进行静态 CPU 分配:
准入条件:
- Pod QoS 为 Guaranteed(CPU Request == CPU Limit,且均为整数值)
- 请求的 CPU 为整数(如
1,2,4)
分配流程:
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
// 1. 非 Guaranteed Pod → 跳过
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { return nil }
// 2. 幂等检查:如果容器已有分配 → 跳过
if s.GetCPUAssignments(string(pod.UID), container.Name) != nil { return nil }
// 3. 计算需要的 CPU 数量
numCPUs := calculateCPUs(pod, container)
if numCPUs == 0 { return nil }
// 4. 从 TopologyManager 获取 NUMA 亲和性
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
// 5. 基于 hint 计算可用的 CPU 集合
// 如果 hint 有 NUMA 亲和性 → 在指定 NUMA 节点上分配
// 否则 → 从默认 cpuset 分配
// 6. 执行分配,更新 state
// - 从 freeCPUs 中移除已分配的 CPU
// - 更新 defaultCPUs(非 Guaranteed Pod 可用的 CPU)
// - 记录分配关系到 checkpoint
}
Reconcile 机制:
CPU Manager 启动一个独立的 reconcile 循环,定期:
- 获取所有活跃 Pod
- 检查每个 Pod 的容器是否仍在运行
- 清理已终止容器的 CPU 分配(
removeStaleState) - 将当前分配写入 checkpoint
3.4.3 Checkpoint 持久化
CPU Manager 使用 cpu_manager_state 文件持久化分配状态:
type CPUManagerCheckpoint struct {
PolicyName string `json:"policyName"`
DefaultCPUSet string `json:"defaultCPUSet"`
Entries map[string]map[string]string `json:"entries"` // podUID → containerName → cpuset
Checksum checksum.Checksum `json:"checksum"`
}
启动时通过 state.NewCheckpointState() 从文件恢复,如果文件不存在或校验失败,则从初始状态重新开始。
3.5 Memory Manager 深度解析
3.5.1 接口定义
type Manager interface {
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider, containerRuntime runtimeService,
initialContainers containermap.ContainerMap) error
AddContainer(p *v1.Pod, c *v1.Container, containerID string)
Allocate(pod *v1.Pod, container *v1.Container) error
RemoveContainer(containerID string) error
State() state.Reader
GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
GetPodTopologyHints(*v1.Pod) map[string][]topologymanager.TopologyHint
GetHintPorviderName() string
GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Int
}
3.5.2 Static Policy 分配逻辑
Memory Manager 的 Static Policy 为 Guaranteed Pod 在 NUMA 节点级别静态分配内存:
准入条件:
- Pod QoS 为 Guaranteed
- Memory Request == Memory Limit
分配流程(policy_static.go 逐行解析):
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
// 1. 仅 Guaranteed Pod
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { return nil }
// 2. 幂等检查
if blocks := s.GetMemoryBlocks(string(pod.UID), container.Name); blocks != nil { return nil }
// 3. 获取 NUMA 亲和性
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
// 4. 如果 hint 无 NUMA 亲和性 → 计算默认 hint
if hint.NUMANodeAffinity == nil {
defaultHint, err := p.getDefaultHint(s, requestedResources)
bestHint = defaultHint
}
// 5. 如果 hint 不满足请求 → 扩展 hint
if !isAffinitySatisfyRequest(machineState, bestHint.NUMANodeAffinity, requestedResources) {
extendedHint, err := p.extendTopologyManagerHint(s, requestedResources, bestHint.NUMANodeAffinity)
bestHint = extendedHint
}
// 6. 在 NUMA 节点上分配内存块
for resourceName, requestedSize := range requestedResources {
containerBlocks = append(containerBlocks, state.Block{
NUMAAffinity: maskBits,
Size: requestedSize,
Type: resourceName, // ResourceMemory 或 Hugepages-*
})
// 7. 更新机器内存状态
for _, nodeID := range maskBits {
// 从 Free 转移到 Reserved
if nodeResourceMemoryState.Free >= requestedSize {
nodeResourceMemoryState.Reserved += requestedSize
nodeResourceMemoryState.Free -= requestedSize
requestedSize = 0
} else {
requestedSize -= nodeResourceMemoryState.Free
nodeResourceMemoryState.Reserved += nodeResourceMemoryState.Free
nodeResourceMemoryState.Free = 0
}
}
}
}
3.5.3 Memory Block 数据结构
type Block struct {
NUMAAffinity []int // NUMA 节点 ID 列表
Size uint64 // 分配大小
Type v1.ResourceName // 资源类型(memory 或 hugepages-*)
}
3.5.4 Machine State
Memory Manager 维护每个 NUMA 节点的内存状态:
type NUMANodeState struct {
NumberOfAssignments int // 分配计数
Cells []int // NUMA 亲和性组
MemoryMap map[v1.ResourceName]*MemoryState // 每种内存类型的状态
}
type MemoryState struct {
Free uint64 // 可用内存
Reserved uint64 // 已保留内存
}
3.5.5 removeStaleState
与 CPU Manager 类似,Memory Manager 在每次 Allocate 和 GetTopologyHints 调用前都会执行 removeStaleState():
- 检查 sourcesReady 是否就绪
- 获取活跃 Pod 列表
- 构建活跃容器集合
- 遍历 MemoryAssignments,移除不在活跃集合中的容器
- 释放对应的内存块
3.6 Device Manager 深度解析
3.6.1 接口定义
type Manager interface {
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
GetWatcherHandler() cache.PluginHandler
Allocate(pod *v1.Pod, container *v1.Container) error
UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error)
GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint
GetHintPorviderName() string
GetAllocatableDevices() []*Device
}
3.6.2 Device Plugin 交互模型
Device Manager 通过 gRPC 与 Device Plugin 交互,遵循 Kubernetes Device Plugin API(v1beta1):
注册流程:
- Device Plugin 在
/var/lib/kubelet/plugins_registry/创建 Socket - Plugin Watcher 检测到 Socket → 触发注册流程
- Device Manager 通过 gRPC 连接到 Plugin
- 调用
GetDevicePluginOptions()获取选项 - 启动
ListAndWatch流获取设备列表 - 设备信息缓存到 endpoint
Endpoint 数据结构:
type endpointImpl struct {
client pluginapi.DevicePluginClient
clientConn *grpc.ClientConn
socketPath string
resourceName string
stopTime time.Time
mutex sync.Mutex
cb monitorCallback // 设备状态变化回调
}
Allocate 流程:
- 从 State 中获取容器的设备分配
- 对每个资源类型,调用对应 endpoint 的
allocate()方法 - Device Plugin 返回 ContainerResponse(包含设备路径、挂载点、环境变量等)
- 组装为 RunContainerOptions
3.6.3 设备分配状态
type Device struct {
ID string // 设备 ID
Health string // 健康状态
Topology *pluginapi.TopologyInfo // NUMA 拓扑信息
}
type DeviceInstanceInfo struct {
deviceIds sets.String // 已分配的设备 ID
allocFunc func(resource string, devs []string) (*pluginapi.AllocateResponse, error)
}
3.7 Topology Manager 深度解析
3.7.1 核心接口
type Manager interface {
lifecycle.PodAdmitHandler
AddHintProvider(HintProvider)
AddContainer(pod *v1.Pod, container *v1.Container, containerID string)
RemoveContainer(containerID string) error
Store
}
type HintProvider interface {
GetHintPorviderName() string
GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]TopologyHint
GetPodTopologyHints(pod *v1.Pod) map[string][]TopologyHint
Allocate(pod *v1.Pod, container *v1.Container) error
}
type TopologyHint struct {
NUMANodeAffinity bitmask.BitMask
Preferred bool
}
3.7.2 策略体系
Topology Manager 支持四种策略:
| 策略 | 类名 | 行为 |
|---|---|---|
none |
nonePolicy |
不做拓扑协调,仅调用 Allocate |
best-effort |
bestEffortPolicy |
尽力满足 NUMA 亲和性,即使无法满足也允许 Pod 运行 |
restricted |
restrictedPolicy |
必须满足 NUMA 亲和性,否则拒绝 Pod |
single-numa-node |
singleNumaNodePolicy |
所有资源必须分配在同一 NUMA 节点上,否则拒绝 Pod |
策略继承关系:
nonePolicy → 直接调用 Allocate
bestEffortPolicy → Merge 返回 (hint, true) // 总是 admit
restrictedPolicy → Merge 返回 (hint, admit) // 有亲和性才 admit
singleNumaNodePolicy → 继承 restrictedPolicy + 额外限制只允许单 NUMA
3.7.3 Hint 合并算法
Merge() 是 Topology Manager 的核心算法,将多个 HintProvider 返回的 hints 合并为一个最优 hint:
1. filterProvidersHints():将所有 provider 的 hints 扁平化
- 如果 provider 返回空 hints → 插入 {nil, true}(无偏好)
- 如果 provider 返回空列表 → 插入 {nil, false}(无可能分配)
2. iterateAllProviderTopologyHints():遍历所有排列组合
- 递归枚举所有 provider hints 的笛卡尔积
- 对每个排列调用 mergePermutation()
3. mergePermutation():合并排列中的所有 hints
- NUMANodeAffinity = 所有 hints 的 Bitwise AND
- Preferred = 所有 hints 都 Preferred 时才为 true
4. 选择 bestHint:
- Preferred 优先于 non-Preferred
- 同等 Preferred 时,更窄的 NUMANodeAffinity 优先
- Count()==0 的 hint 被忽略
3.7.4 Scope:Container vs Pod
Topology Manager 支持两种作用域:
Container Scope (scope_container.go):
- 对 Pod 中的每个容器独立计算拓扑亲和性
- 每个容器可以有不同的 NUMA 亲和性
- Admit 流程:遍历所有容器 →
accumulateProvidersHints()→calculateAffinity()→allocateAlignedResources()
Pod Scope (scope_pod.go):
- 整个 Pod 共享一个拓扑亲和性
- 使用
GetPodTopologyHints()而非GetTopologyHints() - Admit 流程:一次性计算 Pod 级 hint → 所有容器共享同一 hint → 逐一 Allocate
3.7.5 NUMA 节点数限制
const maxAllowableNUMANodes = 8
超过 8 个 NUMA 节点时,hint 排列组合将导致状态爆炸,Topology Manager 拒绝初始化。
3.8 内部容器生命周期接口
type InternalContainerLifecycle interface {
PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error
PostStopContainer(pod *v1.Pod, containerID containerID) error
}
type internalContainerLifecycleImpl struct {
cpuManager cpumanager.Manager
memoryManager memorymanager.Manager
topologyManager topologymanager.Manager
}
PreStartContainer 在容器启动前调用:
topologyManager.AddContainer()cpuManager.AddContainer()memoryManager.AddContainer()
四、Plugin Manager 深度解析
4.1 模块整体结构
4.1.1 核心接口与实现
type PluginManager interface {
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
AddHandler(pluginType string, pluginHandler cache.PluginHandler)
}
type pluginManager struct {
desiredStateOfWorldPopulator *pluginwatcher.Watcher
reconciler reconciler.Reconciler
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
}
4.1.2 PluginHandler 接口
type PluginHandler interface {
ValidatePlugin(pluginName string, endpoint string, versions []string) error
RegisterPlugin(pluginName, endpoint string, versions []string) error
DeRegisterPlugin(pluginName string)
}
状态机:
Socket Created → Validate → Register → Socket Deleted → DeRegister
↑ |
| | Socket created with same name
+-----------+ (ReRegistration)
关键约束:对于同一插件名,操作严格串行。例如:Register(“foo”) 未完成前,不会收到 DeRegister(“foo”) 或 Validate(“foo”, differentEndpoint) 调用。
4.2 Plugin Watcher 深度解析
4.2.1 核心数据结构
type Watcher struct {
path string // 监听的 Socket 目录
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
desiredStateOfWorld cache.DesiredStateOfWorld
}
4.2.2 Start 流程
func (w *Watcher) Start(stopCh <-chan struct{}) error {
// 1. 初始化:创建目录
w.init()
// 2. 创建 fsnotify Watcher
fsWatcher, _ := fsnotify.NewWatcher()
// 3. 遍历已有 Socket 文件(处理 Kubelet 重启后插件已在运行的情况)
w.traversePluginDir(w.path)
// 4. 启动事件监听 goroutine
go func(fsWatcher *fsnotify.Watcher) {
for {
select {
case event := <-fsWatcher.Events:
if event.Op & fsnotify.Create:
w.handleCreateEvent(event) // → AddOrUpdatePlugin to DSW
elif event.Op & fsnotify.Remove:
w.handleDeleteEvent(event) // → RemovePlugin from DSW
case err := <-fsWatcher.Errors:
// log error
case <-stopCh:
w.fsWatcher.Close()
return
}
}
}(fsWatcher)
}
4.2.3 handleCreateEvent 事件处理
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
// 1. 忽略以 '.' 开头的文件
if strings.HasPrefix(fi.Name(), ".") { return nil }
// 2. 如果是普通文件:
if !fi.IsDir() {
// 2a. 检查是否为 Unix Domain Socket
isSocket, _ := util.IsUnixDomainSocket(event.Name)
if !isSocket { return nil }
// 2b. 添加到 DSW(或更新时间戳)
w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
}
// 3. 如果是目录:递归遍历
return w.traversePluginDir(event.Name)
}
4.3 Plugin Manager Reconciler 深度解析
4.3.1 核心数据结构
type reconciler struct {
operationExecutor operationexecutor.OperationExecutor
loopSleepDuration time.Duration // 默认 1s
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
handlers map[string]cache.PluginHandler // pluginType → handler
sync.RWMutex
}
4.3.2 reconcile 流程
func (rc *reconciler) reconcile() {
// 阶段一:反注册不再需要的插件(先于注册执行)
for _, registeredPlugin := range rc.actualStateOfWorld.GetRegisteredPlugins() {
unregisterPlugin := false
// 情况A: 插件不在 DSW 中 → 需要反注册
if !rc.desiredStateOfWorld.PluginExists(registeredPlugin.SocketPath) {
unregisterPlugin = true
} else {
// 情况B: 插件在 DSW 中但时间戳不同 → 需要先反注册再重新注册
for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if dswPlugin.SocketPath == registeredPlugin.SocketPath &&
dswPlugin.Timestamp != registeredPlugin.Timestamp {
unregisterPlugin = true
break
}
}
}
if unregisterPlugin {
rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
}
}
// 阶段二:注册需要的插件
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
rc.operationExecutor.RegisterPlugin(
pluginToRegister.SocketPath,
pluginToRegister.Timestamp,
rc.getHandlers(), // 传入所有注册的 Handler
rc.actualStateOfWorld)
}
}
}
4.4 OperationExecutor 与 OperationGenerator
4.4.1 Register 操作流程
1. OperationExecutor.RegisterPlugin(socketPath, timestamp, handlers, asw)
2. 通过 goroutinemap 确保同一 socket 不会并发操作
3. 启动 goroutine 执行:
a. 通过 Socket 建立连接
b. 获取 Plugin Info(名称、版本)
c. 遍历 handlers:
- handler.ValidatePlugin(pluginName, endpoint, versions)
- handler.RegisterPlugin(pluginName, endpoint, versions)
d. 成功后:asw.AddPlugin(pluginInfo)
4.4.2 Unregister 操作流程
1. OperationExecutor.UnregisterPlugin(registeredPlugin, asw)
2. 启动 goroutine 执行:
a. 遍历 handlers:
- handler.DeRegisterPlugin(pluginName)
b. asw.RemovePlugin(socketPath)
五、辅助子系统
5.1 Checkpoint Manager
5.1.1 接口
type Checkpoint interface {
MarshalCheckpoint() ([]byte, error)
UnmarshalCheckpoint(blob []byte) error
VerifyChecksum() error
}
type CheckpointManager interface {
CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error
GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error
RemoveCheckpoint(checkpointKey string) error
ListCheckpoints() ([]string, error)
}
5.1.2 实现
type impl struct {
path string
store utilstore.Store // FileStore 后端
mutex sync.Mutex
}
CreateCheckpoint:序列化 → 写入文件GetCheckpoint:读取文件 → 反序列化 → VerifyChecksum- 底层使用
utilstore.FileStore,文件存储在--root-dir下的 checkpoint 目录 - Checksum 使用
checksum.Checksum类型(通常为 CRC32 或类似算法)
使用场景:
- CPU Manager:
cpu_manager_state文件 - Memory Manager:
memory_manager_state文件 - Device Manager:
kubelet_internal_checkpoint文件
5.2 Certificate Manager
Certificate Manager 负责 kubelet 的证书轮换和引导:
certificate/transport.go:管理传输层证书certificate/bootstrap/bootstrap.go:TLS 引导流程certificate/kubelet.go:kubelet 证书管理
核心功能:
- Bootstrap:kubelet 启动时使用 bootstrap token 获取初始证书
- Rotation:证书接近过期时自动轮换
- Storage:将证书和密钥存储到指定目录
5.3 Cloud Resource Manager
type CloudRequestManager interface {
Run(stopCh <-chan struct{})
GetCloudNode(node *v1.Node) (*v1.Node, error)
}
负责与云提供商交互,获取节点信息(如云实例类型、区域等),更新 Node 对象。
六、模块间协作与集成
6.1 Pod 准入流程中的资源分配
当 Pod 被调度到节点后,Kubelet 通过 Pod Admit Handler 链进行准入检查:
1. TopologyManager.Admit() (如果启用)
├── 对每个容器调用 HintProvider.GetTopologyHints()
│ ├── CPUManager.GetTopologyHints()
│ ├── MemoryManager.GetTopologyHints()
│ └── DeviceManager.GetTopologyHints()
├── Policy.Merge() 合并 hints
├── 如果 admit → 对每个容器调用 HintProvider.Allocate()
│ ├── CPUManager.Allocate() → 静态 CPU 分配
│ ├── MemoryManager.Allocate() → NUMA 内存分配
│ └── DeviceManager.Allocate() → 设备分配
└── 如果 !admit → 拒绝 Pod
2. resourceAllocator.Admit() (Topology Manager 未启用时的 fallback)
├── DeviceManager.Allocate()
├── CPUManager.Allocate()
└── MemoryManager.Allocate()
6.2 Container Manager 与 Plugin Manager 的集成
Container Manager 通过 GetPluginRegistrationHandler() 返回 Device Manager 的 Watcher Handler,由 Plugin Manager 在注册 Device Plugin 时调用:
PluginManager.AddHandler("device-plugin", deviceManager.GetWatcherHandler())
当 Device Plugin 注册时:
- Plugin Watcher 检测到 Socket
- Plugin Reconciler 触发 RegisterPlugin
- 调用 Device Manager 的
ValidatePlugin()→RegisterPlugin() - Device Manager 创建 gRPC endpoint,启动 ListAndWatch
6.3 Volume Manager 与 Container Manager 的交互
两者在 Pod 生命周期中协作但职责独立:
- Volume Manager 在 SyncPod 之前完成卷的 Attach/Mount
- Container Manager 在 Pod 准入阶段完成资源分配
- Container Runtime 创建容器时同时需要:
- Volume Manager 提供的挂载点信息
- Container Manager 提供的设备、CPU、内存配置
6.4 Checkpoint 一致性
所有子 Manager 共享相同的 Checkpoint Manager 基础设施,但使用不同的 checkpoint 文件:
- CPU Manager 写入
cpu_manager_state - Memory Manager 写入
memory_manager_state - Device Manager 写入
kubelet_internal_checkpoint
每个 checkpoint 文件包含:
- Policy 名称
- 分配映射(Pod → Container → 资源分配)
- 机器状态
- Checksum 校验
Kubelet 重启后,各 Manager 从 checkpoint 恢复状态,然后通过 reconcile 机制与运行时实际状态对齐。
七、Mermaid 图集
图1:Volume Manager 整体架构图
图2:ASW/DSW Reconciliation 流程图
图3:Volume Plugin 接口层次图
图4:Container Manager 架构图
图5:cgroup 层次结构图
图6:CPU Manager Static Policy 流程图
图7:Memory Manager Static Policy 流程图
图8:Device Plugin 交互时序图
图9:Plugin Manager 注册流程图
图10:Checkpoint 持久化流程图
图11:Topology Manager 准入流程图
图12:Volume Attach/Detach 完整流程图
附录A:关键数据流总结
Volume Manager 数据流
PodManager.GetPods()
→ Populator.findAndAddNewPods()
→ createVolumeSpec() [PVC→PV解析, CSI迁移]
→ DSW.AddPodToVolume()
→ Populator.findAndRemoveDeletedPods()
→ DSW.DeletePodFromVolume()
Reconciler.reconcile() [每100ms]
→ ASW.GetAllMountedVolumes() vs DSW.PodExistsInVolume()
→ OE.UnmountVolume()
→ DSW.GetVolumesToMount() vs ASW.PodExistsInVolume()
→ OE.AttachVolume() / VerifyControllerAttachedVolume()
→ OE.MountVolume() / ExpandInUseVolume()
→ ASW.GetUnmountedVolumes() vs DSW.VolumeExists()
→ OE.UnmountDevice() / DetachVolume()
Container Manager 数据流
Kubelet Pod Admit:
→ TopologyManager.Admit()
→ [CPU/Memory/Device]Manager.GetTopologyHints()
→ Policy.Merge() → bestHint
→ [CPU/Memory/Device]Manager.Allocate()
Kubelet SyncPod:
→ ContainerManager.GetResources()
→ DeviceManager.GetDeviceRunContainerOptions()
→ InternalContainerLifecycle.PreStartContainer()
→ TopologyManager.AddContainer()
→ CPUManager.AddContainer()
→ MemoryManager.AddContainer()
Container Removal:
→ CPUManager.RemoveContainer()
→ MemoryManager.RemoveContainer()
→ TopologyManager.RemoveContainer()
Plugin Manager 数据流
fsnotify Event (Socket Create/Delete)
→ PluginWatcher.handleCreateEvent/handleDeleteEvent
→ DSW.AddOrUpdatePlugin / RemovePlugin
Reconciler.reconcile() [每1s]
→ ASW.GetRegisteredPlugins() vs DSW.PluginExists()
→ OE.UnregisterPlugin() → handler.DeRegisterPlugin()
→ DSW.GetPluginsToRegister() vs ASW.PluginExistsWithCorrectTimestamp()
→ OE.RegisterPlugin() → handler.ValidatePlugin() → handler.RegisterPlugin()
附录B:配置参数与Feature Gate影响
| Feature Gate | 影响的模块 | 行为变化 |
|---|---|---|
CPUManager |
CPU Manager | 启用 CPU 亲和性管理 |
MemoryManager |
Memory Manager | 启用 NUMA 内存分配 |
TopologyManager |
Topology Manager | 启用拓扑感知资源协调 |
DevicePlugins |
Device Manager | 启用 Device Plugin 支持 |
ExpandInUsePersistentVolumes |
Volume Manager Populator | 启用在线卷扩容 |
BlockVolume |
Volume Manager | 支持块卷模式 |
GenericEphemeralVolume |
Volume Manager Populator | 支持 Generic Ephemeral 内联卷 |
StorageObjectInUseProtection |
Volume Manager Populator | PVC 删除保护 |
CSIMigration |
Volume Manager | CSI 迁移支持 |
| 配置参数 | 影响的模块 | 默认值 |
|---|---|---|
--cgroups-per-qos |
Container Manager | true |
--cgroup-driver |
Container Manager | cgroupfs |
--cpu-manager-policy |
CPU Manager | none |
--cpu-manager-reconcile-period |
CPU Manager | 10s |
--memory-manager-policy |
Memory Manager | none |
--topology-manager-policy |
Topology Manager | none |
--topology-manager-scope |
Topology Manager | container |
--fail-swap-on |
Container Manager | true |
--keep-terminated-pod-volumes |
Volume Manager | false |
附录C:错误处理与恢复机制
Volume Manager 错误处理
- 操作冲突:通过
goroutinemap和nestedpendingoperations防止同一卷的并发操作 - 指数退避:操作失败后通过
exponentialbackoff逐渐增加重试间隔 - Pod 错误聚合:DSW 通过
AddErrorToPod/PopPodErrors聚合 Pod 级别的卷错误 - Kubelet 重启恢复:
syncStates()从磁盘扫描恢复卷状态
Container Manager 错误处理
- Checkpoint 校验:启动时通过
VerifyChecksum()验证 checkpoint 完整性 - Stale State 清理:各子 Manager 定期执行
removeStaleState(),清理已终止容器的分配 - Pod 准入拒绝:Topology Manager 和 resourceAllocator 在分配失败时拒绝 Pod
- Cgroup 重建:
setupNode()在启动时重建 QoS cgroup 层次
Plugin Manager 错误处理
- 注册失败:ValidatePlugin/RegisterPlugin 失败时不更新 ASW,下次循环重试
- ReRegistration:Socket 文件重建时通过时间戳检测,先反注册再注册
- Socket 消失:fsnotify 的 Remove 事件触发 DSW 移除,Reconciler 执行反注册
更多推荐

所有评论(0)