【技术】从POD创建看Kubernetes源码实现 (六)- containerd
本文分析了Kubernetes创建POD时containerd侧的代码流程。主要内容包括:1) containerd作为容器运行时通过CRI插件与Kubernetes集成;2) 详细解析RunPodSandbox函数的实现过程,包括网络命名空间创建、沙箱元数据管理等;3) 介绍CreateSandbox函数如何作为适配器将CRI请求委托给具体沙箱控制器。文章基于Kubernetes v1.34版本
✍️ 作者:茶水间Tech
🏷️ 标签:#云计算#云原生#kubernetes#容器
📖 前言
kubernetes的模块比较多,架构复杂,代码量更是庞大,看代码比较麻烦,我们从现实场景出发,从创建POD分析在Kubernetes内部的代码流程,本系列文章从POD创建,整体梳理Kubernetes源码实现,其中本节主要分析containerd侧的流程实现。
本文基于 Client Version: v1.34.3 , Server Version: v1.34.2
📌 POD创建的整体架构图:

💻 正文
📑 一、关于 containerd
containerd 是一个高效、可靠的开源容器运行时,它被设计为从开发到生产环境的核心容器管理解决方案。
containerd 的生态系统包括一系列与其集成的工具和组件,这些工具和组件扩展了 containerd 的功能并增强了其适用性。
- CRI 插件:与 Kubernetes 紧密集成,通过实现 Container Runtime Interface (CRI),使 Kubernetes 能够管理容器。
- CNI 插件:使用 Container Network Interface (CNI)插件进行网络管理,提供容器的网络连接。
- CSI 插件:Container Storage Interface (CSI)插件用于存储管理,允许容器挂载和管理存储卷。
- 镜像管理:支持 Docker 镜像和 OCI 镜像规范,提供从镜像仓库拉取、存储和管理容器镜像的能力。
- 插件机制:允许通过插件扩展 containerd 的功能,满足特定的需求。

作为CRI插件,与 Kubernetes 紧密集成,通过实现 Container Runtime Interface (CRI),使 Kubernetes 能够管理容器。

📑 二、代码分析
RunPodSandbox (CRI 层)
↓
CreateSandbox (sandbox_service.go)
↓
Create (podsandbox.Controller) - 创建元数据
↓
Start (podsandbox.Controller) ← 启动沙箱
↓
client.NewContainer - 创建容器
↓
container.NewTask - 创建任务
↓
task.Start - 启动进程
2.1 Containerd创建沙箱:RunPodSandbox(sandbox_run.go)
[RunPodSandbox] 函数实现了 CRI (Container Runtime Interface) 的 RunPodSandbox API,用于创建和启动 Pod 沙箱。Pod 沙箱是容器运行的环境,包含网络命名空间、cgroups 等资源。这是 Kubernetes 通过 CRI 与容器运行时交互的核心接口之一。
代码路径:containerd/internal/cri/server/sandbox_run.go
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {
// ...(略)
// 生成唯一 ID
id := util.GenerateID()
// ...(略)
// 预留 Sandbox 名称,避免并发创建相同的 Sandbox
if err := c.sandboxNameIndex.Reserve(name, id); err != nil {
return nil, fmt.Errorf("failed to reserve sandbox name %q: %w", name, err)
}
defer func() {
// Release the name if the function returns with an error.
// When cleanupErr != nil, the name will be cleaned in sandbox_remove.
if retErr != nil && cleanupErr == nil {
c.sandboxNameIndex.ReleaseByName(name)
}
}()
// 创建租约(用于资源管理和垃圾回收)
leaseSvc := c.client.LeasesService()
ls, lerr := leaseSvc.Create(ctx, leases.WithID(id))
// ...(略)
// Setup the network namespace if host networking wasn't requested.
//网络命名空间创建(关键步骤)
if !hostNetwork(config) {
span.AddEvent("setup pod network")
netStart := time.Now()
// If it is not in host network namespace then create a namespace and set the sandbox
// handle. NetNSPath in sandbox metadata and NetNS is non empty only for non host network
// namespaces. If the pod is in host network namespace then both are empty and should not
// be used.
// 创建网络命名空间
var netnsMountDir = "/var/run/netns"
if c.config.NetNSMountsUnderStateDir {
netnsMountDir = filepath.Join(c.config.StateDir, "netns")
}
// 根据是否启用用户命名空间选择创建方式
if !userNsEnabled {
sandbox.NetNS, err = netns.NewNetNS(netnsMountDir)
} else {
usernsOpts := config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetUsernsOptions()
sandbox.NetNS, err = c.setupNetnsWithinUserns(netnsMountDir, usernsOpts)
}
if err != nil {
return nil, fmt.Errorf("failed to create network namespace for sandbox %q: %w", id, err)
}
// Update network namespace in the store, which is used to generate the container's spec
sandbox.NetNSPath = sandbox.NetNS.GetPath()
//...(略)
// 设置 Pod 网络
if err := c.setupPodNetwork(ctx, &sandbox); err != nil {
return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err)
}
sandboxCreateNetworkTimer.UpdateSince(netStart)
if err := sandboxInfo.AddExtension(podsandbox.MetadataKey, &sandbox.Metadata); err != nil {
return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err)
}
// Save sandbox metadata to store
if sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil {
return nil, fmt.Errorf("unable to save sandbox %q to sandbox store: %w", id, err)
}
}
//创建沙箱
if err := c.sandboxService.CreateSandbox(ctx, sandboxInfo, sb.WithOptions(config), sb.WithNetNSPath(sandbox.NetNSPath)); err != nil {
return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err)
}
//启动沙箱
ctrl, err := c.sandboxService.StartSandbox(ctx, sandbox.Sandboxer, id)
// ...(略)
}
2.2 创建沙箱: CreateSandbox(sandbox_service.go)
[CreateSandbox] 函数是 CRI 沙箱服务的核心方法之一,负责创建 Pod 沙箱。它作为一个适配器/包装器,将 CRI 层的沙箱创建请求委托给具体的沙箱控制器实现。这个函数支持多种沙箱类型(如 Pod 沙箱、VM 沙箱等),通过 Sandboxer 字段选择合适的控制器。
代码路径:containerd/internal/cri/server/sandbox_service.go
func (c *criSandboxService) CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error {
ctrl, err := c.SandboxController(info.Sandboxer)
if err != nil {
return err
}
return ctrl.Create(ctx, info, opts...)
}
2.3 创建沙箱:Create (controller_service.go)
本文nginx 是普通pod,基于runc,普通 Pod: 使用 podsandbox 控制器,不走 shim,直接创建 pause 容器作为沙箱
何时走 Shim
- 配置中明确指定:
sandboxer = "shim" - VM 沙箱场景: 如 Kata Containers、Firecracker 等
- 需要更强隔离: 通过独立的 shim 进程提供隔离
Create方法是 [podsandbox.Controller]的核心方法之一,负责创建 Pod 沙箱的元数据记录。
关键特点:
- 只创建元数据,不创建实际的容器或进程
- 延迟创建:实际的 pause 容器在 [
Start]方法中创建 - 状态初始化:将沙箱状态设置为 [
sandboxstore.StateUnknown]
这种设计遵循了"创建-启动分离"的模式,允许在创建和启动之间进行配置和验证。
代码路径:containerd/internal/cri/server/podsandbox/sandbox_run.go
func (c *Controller) Create(_ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error {
metadata := sandboxstore.Metadata{}
if err := info.GetExtension(MetadataKey, &metadata); err != nil {
return fmt.Errorf("failed to get sandbox %q metadata: %w", info.ID, err)
}
podSandbox := types.NewPodSandbox(info.ID, sandboxstore.Status{State: sandboxstore.StateUnknown})
podSandbox.Metadata = metadata
podSandbox.Runtime = info.Runtime
return c.store.Save(podSandbox)
}
2.4 启动沙箱:StartSandbox(sandbox_service.go)
调用控制器启动沙箱
代码路径:containerd/internal/cri/server/sandbox_service.go
func (c *criSandboxService) StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error) {
ctrl, err := c.SandboxController(sandboxer)
if err != nil {
return sandbox.ControllerInstance{}, err
}
return ctrl.Start(ctx, sandboxID)
}
[Start] 方法是 podsandbox.Controller的核心方法,负责创建并启动 Pod 沙箱容器(pause 容器)。
主要功能:
- 创建 pause 容器作为 Pod 的网络和命名空间沙箱
- 配置 OCI 运行时规范(spec)
- 创建 containerd 容器和任务
- 启动沙箱进程
- 设置监控和清理机制
关键特点:
- 直接使用 containerd 客户端,不通过 shim
- 完整的资源管理:包括目录、容器、任务、文件等
- 健壮的错误处理:多层 defer 确保资源清理
- 状态管理:将沙箱状态从 Unknown 更新为 Ready
代码路径:containerd/internal/cri/server/podsandbox/sandbox_run.go
func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.ControllerInstance, retErr error) {
var cleanupErr error
defer func() {
if retErr != nil && cleanupErr != nil {
log.G(ctx).WithField("id", id).WithError(cleanupErr).Errorf("failed to fully teardown sandbox resources after earlier error: %s", retErr)
retErr = errors.Join(retErr, CleanupErr{cleanupErr})
}
}()
podSandbox := c.store.Get(id)
if podSandbox == nil {
return cin, fmt.Errorf("unable to find pod sandbox with id %q: %w", id, errdefs.ErrNotFound)
}
metadata := podSandbox.Metadata
var (
config = metadata.Config
labels = map[string]string{}
)
sandboxImage := c.getSandboxImageName()
// Ensure sandbox container image snapshot.
//确保沙箱镜像存在
image, err := c.ensureImageExists(ctx, sandboxImage, config, metadata.RuntimeHandler)
if err != nil {
return cin, fmt.Errorf("failed to get sandbox image %q: %w", sandboxImage, err)
}
containerdImage, err := c.toContainerdImage(ctx, *image)
if err != nil {
return cin, fmt.Errorf("failed to get image from containerd %q: %w", image.ID, err)
}
ociRuntime, err := c.config.GetSandboxRuntime(config, metadata.RuntimeHandler)
if err != nil {
return cin, fmt.Errorf("failed to get sandbox runtime: %w", err)
}
log.G(ctx).WithField("podsandboxid", id).Debugf("use OCI runtime %+v", ociRuntime)
labels["oci_runtime_type"] = ociRuntime.Type
// Create sandbox container root directories.
//创建沙箱根目录
sandboxRootDir := c.getSandboxRootDir(id)
if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil {
return cin, fmt.Errorf("failed to create sandbox root directory %q: %w",
sandboxRootDir, err)
}
defer func() {
if retErr != nil && cleanupErr == nil {
// Cleanup the sandbox root directory.
if cleanupErr = c.os.RemoveAll(sandboxRootDir); cleanupErr != nil {
log.G(ctx).WithError(cleanupErr).Errorf("Failed to remove sandbox root directory %q",
sandboxRootDir)
}
}
}()
volatileSandboxRootDir := c.getVolatileSandboxRootDir(id)
if err := c.os.MkdirAll(volatileSandboxRootDir, 0755); err != nil {
return cin, fmt.Errorf("failed to create volatile sandbox root directory %q: %w",
volatileSandboxRootDir, err)
}
defer func() {
if retErr != nil && cleanupErr == nil {
deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel()
// Cleanup the volatile sandbox root directory.
if cleanupErr = ensureRemoveAll(deferCtx, volatileSandboxRootDir); cleanupErr != nil {
log.G(ctx).WithError(cleanupErr).Errorf("Failed to remove volatile sandbox root directory %q",
volatileSandboxRootDir)
}
}
}()
// Create sandbox container.
// NOTE: sandboxContainerSpec SHOULD NOT have side
// effect, e.g. accessing/creating files, so that we can test
// it safely.
//生成 OCI spec
spec, err := c.sandboxContainerSpec(id, config, &image.ImageSpec.Config, metadata.NetNSPath, ociRuntime.PodAnnotations)
if err != nil {
return cin, fmt.Errorf("failed to generate sandbox container spec: %w", err)
}
log.G(ctx).WithField("podsandboxid", id).Debugf("sandbox container spec: %#+v", spew.NewFormatter(spec))
//处理 SELinux 和特权容器
metadata.ProcessLabel = spec.Process.SelinuxLabel
defer func() {
if retErr != nil {
selinux.ReleaseLabel(metadata.ProcessLabel)
}
}()
labels["selinux_label"] = metadata.ProcessLabel
// handle any KVM based runtime
if err := modifyProcessLabel(ociRuntime.Type, spec); err != nil {
return cin, err
}
if config.GetLinux().GetSecurityContext().GetPrivileged() {
// If privileged don't set selinux label, but we still record the MCS label so that
// the unused label can be freed later.
spec.Process.SelinuxLabel = ""
// If privileged is enabled, sysfs should have the rw attribute
for i, k := range spec.Mounts {
if filepath.Clean(k.Destination) == "/sys" {
for j, v := range spec.Mounts[i].Options {
if v == "ro" {
spec.Mounts[i].Options[j] = "rw"
break
}
}
break
}
}
}
// Generate spec options that will be applied to the spec later.
specOpts, err := c.sandboxContainerSpecOpts(config, &image.ImageSpec.Config)
if err != nil {
return cin, fmt.Errorf("failed to generate sandbox container spec options: %w", err)
}
sandboxLabels := ctrdutil.BuildLabels(config.Labels, image.ImageSpec.Config.Labels, crilabels.ContainerKindSandbox)
snapshotterOpt := []snapshots.Opt{snapshots.WithLabels(snapshots.FilterInheritedLabels(config.Annotations))}
extraSOpts, err := sandboxSnapshotterOpts(config)
if err != nil {
return cin, err
}
snapshotterOpt = append(snapshotterOpt, extraSOpts...)
opts := []containerd.NewContainerOpts{
containerd.WithSnapshotter(c.imageService.RuntimeSnapshotter(ctx, ociRuntime)),
customopts.WithNewSnapshot(id, containerdImage, snapshotterOpt...),
containerd.WithSpec(spec, specOpts...),
containerd.WithContainerLabels(sandboxLabels),
containerd.WithContainerExtension(crilabels.SandboxMetadataExtension, &metadata),
containerd.WithRuntime(ociRuntime.Type, podSandbox.Runtime.Options),
}
// 创建 containerd 容器
container, err := c.client.NewContainer(ctx, id, opts...)
if err != nil {
return cin, fmt.Errorf("failed to create containerd container: %w", err)
}
podSandbox.Container = container
defer func() {
if retErr != nil && cleanupErr == nil {
deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel()
if cleanupErr = container.Delete(deferCtx, containerd.WithSnapshotCleanup); cleanupErr != nil {
log.G(ctx).WithError(cleanupErr).Errorf("Failed to delete containerd container %q", id)
}
podSandbox.Container = nil
}
}()
// Setup files required for the sandbox.
//设置沙箱文件
if err = c.setupSandboxFiles(id, config); err != nil {
return cin, fmt.Errorf("failed to setup sandbox files: %w", err)
}
defer func() {
if retErr != nil && cleanupErr == nil {
if cleanupErr = c.cleanupSandboxFiles(id, config); cleanupErr != nil {
log.G(ctx).WithError(cleanupErr).Errorf("Failed to cleanup sandbox files in %q",
sandboxRootDir)
}
}
}()
// Update sandbox created timestamp.
info, err := container.Info(ctx)
if err != nil {
return cin, fmt.Errorf("failed to get sandbox container info: %w", err)
}
// Create sandbox task in containerd.
log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, metadata.Name)
var taskOpts []containerd.NewTaskOpts
if ociRuntime.Path != "" {
taskOpts = append(taskOpts, containerd.WithRuntimePath(ociRuntime.Path))
}
// We don't need stdio for sandbox container.
//创建和启动任务
task, err := container.NewTask(ctx, containerdio.NullIO, taskOpts...)
if err != nil {
return cin, fmt.Errorf("failed to create containerd task: %w", err)
}
defer func() {
if retErr != nil && cleanupErr == nil {
deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel()
// Cleanup the sandbox container if an error is returned.
if _, err := task.Delete(deferCtx, WithNRISandboxDelete(id), containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("Failed to delete sandbox container %q", id)
cleanupErr = err
}
}
}()
// wait is a long running background request, no timeout needed.
exitCh, err := task.Wait(ctrdutil.NamespacedContext())
if err != nil {
return cin, fmt.Errorf("failed to wait for sandbox container task: %w", err)
}
nric, err := nri.New()
if err != nil {
return cin, fmt.Errorf("unable to create nri client: %w", err)
}
if nric != nil {
nriSB := &nri.Sandbox{
ID: id,
Labels: config.Labels,
}
if _, err := nric.InvokeWithSandbox(ctx, task, v1.Create, nriSB); err != nil {
return cin, fmt.Errorf("nri invoke: %w", err)
}
}
if err := task.Start(ctx); err != nil {
return cin, fmt.Errorf("failed to start sandbox container task %q: %w", id, err)
}
pid := task.Pid()
if err := podSandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
status.Pid = pid
status.State = sandboxstore.StateReady
status.CreatedAt = info.CreatedAt
return status, nil
}); err != nil {
return cin, fmt.Errorf("failed to update status of pod sandbox %q: %w", id, err)
}
cin.SandboxID = id
cin.Pid = task.Pid()
cin.CreatedAt = info.CreatedAt
cin.Labels = labels
go func() {
if err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh); err != nil {
log.G(context.Background()).Warnf("failed to wait pod sandbox exit %v", err)
}
}()
return
}
📝 总结与展望
Containerd 创建沙箱的核心流程:CRI 层接收请求,生成 ID,预留名称,设置网络,沙箱服务层根据配置选择控制器,而控制器层执行实际的沙箱创建,通过podsandbox 模式(默认)创建 pause 容器。
📚 参考资料
https://kubernetes.io/zh-cn/docs/reference/command-line-tools-reference/kubelet/
更多推荐


所有评论(0)