✍️ 作者:茶水间Tech

🏷️ 标签:#云计算#云原生#kubernetes#容器

📖 前言

​ kubernetes的模块比较多,架构复杂,代码量更是庞大,看代码比较麻烦,我们从现实场景出发,从创建POD分析在Kubernetes内部的代码流程,本系列文章从POD创建,整体梳理Kubernetes源码实现,其中本节主要分析containerd侧的流程实现。

​ 本文基于 Client Version: v1.34.3 , Server Version: v1.34.2

​ 📌 POD创建的整体架构图

在这里插入图片描述

💻 正文

📑 一、关于 containerd

containerd 是一个高效、可靠的开源容器运行时,它被设计为从开发到生产环境的核心容器管理解决方案。

containerd 的生态系统包括一系列与其集成的工具和组件,这些工具和组件扩展了 containerd 的功能并增强了其适用性。

  • CRI 插件:与 Kubernetes 紧密集成,通过实现 Container Runtime Interface (CRI),使 Kubernetes 能够管理容器。
  • CNI 插件:使用 Container Network Interface (CNI)插件进行网络管理,提供容器的网络连接。
  • CSI 插件:Container Storage Interface (CSI)插件用于存储管理,允许容器挂载和管理存储卷。
  • 镜像管理:支持 Docker 镜像和 OCI 镜像规范,提供从镜像仓库拉取、存储和管理容器镜像的能力。
  • 插件机制:允许通过插件扩展 containerd 的功能,满足特定的需求。

在这里插入图片描述

作为CRI插件,与 Kubernetes 紧密集成,通过实现 Container Runtime Interface (CRI),使 Kubernetes 能够管理容器。

在这里插入图片描述

📑 二、代码分析
RunPodSandbox (CRI 层)
    ↓
CreateSandbox (sandbox_service.go)
    ↓
Create (podsandbox.Controller) - 创建元数据
    ↓
Start (podsandbox.Controller) ← 启动沙箱
    ↓
client.NewContainer - 创建容器
    ↓
container.NewTask - 创建任务
    ↓
task.Start - 启动进程
2.1 Containerd创建沙箱:RunPodSandbox(sandbox_run.go)

[RunPodSandbox] 函数实现了 CRI (Container Runtime Interface) 的 RunPodSandbox API,用于创建和启动 Pod 沙箱。Pod 沙箱是容器运行的环境,包含网络命名空间、cgroups 等资源。这是 Kubernetes 通过 CRI 与容器运行时交互的核心接口之一。

代码路径:containerd/internal/cri/server/sandbox_run.go

func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {
	// ...(略)
	// 生成唯一 ID
	id := util.GenerateID()
	// ...(略)
	// 预留 Sandbox 名称,避免并发创建相同的 Sandbox
	if err := c.sandboxNameIndex.Reserve(name, id); err != nil {
		return nil, fmt.Errorf("failed to reserve sandbox name %q: %w", name, err)
	}
	defer func() {
		// Release the name if the function returns with an error.
		// When cleanupErr != nil, the name will be cleaned in sandbox_remove.
		if retErr != nil && cleanupErr == nil {
			c.sandboxNameIndex.ReleaseByName(name)
		}
	}()
	// 创建租约(用于资源管理和垃圾回收)
	leaseSvc := c.client.LeasesService()
	ls, lerr := leaseSvc.Create(ctx, leases.WithID(id))
	// ...(略)

	// Setup the network namespace if host networking wasn't requested.
	//网络命名空间创建(关键步骤)
	if !hostNetwork(config) {
		span.AddEvent("setup pod network")
		netStart := time.Now()
		// If it is not in host network namespace then create a namespace and set the sandbox
		// handle. NetNSPath in sandbox metadata and NetNS is non empty only for non host network
		// namespaces. If the pod is in host network namespace then both are empty and should not
		// be used.
		// 创建网络命名空间
		var netnsMountDir = "/var/run/netns"
		if c.config.NetNSMountsUnderStateDir {
			netnsMountDir = filepath.Join(c.config.StateDir, "netns")
		}
 		// 根据是否启用用户命名空间选择创建方式
		if !userNsEnabled {
			sandbox.NetNS, err = netns.NewNetNS(netnsMountDir)
		} else {
			usernsOpts := config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetUsernsOptions()
			sandbox.NetNS, err = c.setupNetnsWithinUserns(netnsMountDir, usernsOpts)
		}
		if err != nil {
			return nil, fmt.Errorf("failed to create network namespace for sandbox %q: %w", id, err)
		}
		// Update network namespace in the store, which is used to generate the container's spec
		sandbox.NetNSPath = sandbox.NetNS.GetPath()
		//...(略)
		// 设置 Pod 网络
		if err := c.setupPodNetwork(ctx, &sandbox); err != nil {
			return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err)
		}
		sandboxCreateNetworkTimer.UpdateSince(netStart)

		if err := sandboxInfo.AddExtension(podsandbox.MetadataKey, &sandbox.Metadata); err != nil {
			return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err)
		}

		// Save sandbox metadata to store
		if sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil {
			return nil, fmt.Errorf("unable to save sandbox %q to sandbox store: %w", id, err)
		}
	}
	//创建沙箱
	if err := c.sandboxService.CreateSandbox(ctx, sandboxInfo, sb.WithOptions(config), sb.WithNetNSPath(sandbox.NetNSPath)); err != nil {
		return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err)
	}
	//启动沙箱
	ctrl, err := c.sandboxService.StartSandbox(ctx, sandbox.Sandboxer, id)
	// ...(略)
}
2.2 创建沙箱: CreateSandbox(sandbox_service.go)

[CreateSandbox] 函数是 CRI 沙箱服务的核心方法之一,负责创建 Pod 沙箱。它作为一个适配器/包装器,将 CRI 层的沙箱创建请求委托给具体的沙箱控制器实现。这个函数支持多种沙箱类型(如 Pod 沙箱、VM 沙箱等),通过 Sandboxer 字段选择合适的控制器。

代码路径:containerd/internal/cri/server/sandbox_service.go

func (c *criSandboxService) CreateSandbox(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error {
	ctrl, err := c.SandboxController(info.Sandboxer)
	if err != nil {
		return err
	}
	return ctrl.Create(ctx, info, opts...)
}
2.3 创建沙箱:Create (controller_service.go)

本文nginx 是普通pod,基于runc,普通 Pod: 使用 podsandbox 控制器,不走 shim,直接创建 pause 容器作为沙箱

何时走 Shim
  1. 配置中明确指定: sandboxer = "shim"
  2. VM 沙箱场景: 如 Kata Containers、Firecracker 等
  3. 需要更强隔离: 通过独立的 shim 进程提供隔离

Create方法是 [podsandbox.Controller]的核心方法之一,负责创建 Pod 沙箱的元数据记录

关键特点

  • 只创建元数据,不创建实际的容器或进程
  • 延迟创建:实际的 pause 容器在 [Start]方法中创建
  • 状态初始化:将沙箱状态设置为 [sandboxstore.StateUnknown]

这种设计遵循了"创建-启动分离"的模式,允许在创建和启动之间进行配置和验证。

代码路径:containerd/internal/cri/server/podsandbox/sandbox_run.go

func (c *Controller) Create(_ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error {
	metadata := sandboxstore.Metadata{}
	if err := info.GetExtension(MetadataKey, &metadata); err != nil {
		return fmt.Errorf("failed to get sandbox %q metadata: %w", info.ID, err)
	}
	podSandbox := types.NewPodSandbox(info.ID, sandboxstore.Status{State: sandboxstore.StateUnknown})
	podSandbox.Metadata = metadata
	podSandbox.Runtime = info.Runtime
	return c.store.Save(podSandbox)
}
2.4 启动沙箱:StartSandbox(sandbox_service.go)

调用控制器启动沙箱

代码路径:containerd/internal/cri/server/sandbox_service.go

func (c *criSandboxService) StartSandbox(ctx context.Context, sandboxer string, sandboxID string) (sandbox.ControllerInstance, error) {
	ctrl, err := c.SandboxController(sandboxer)
	if err != nil {
		return sandbox.ControllerInstance{}, err
	}
	return ctrl.Start(ctx, sandboxID)
}

[Start] 方法是 podsandbox.Controller的核心方法,负责创建并启动 Pod 沙箱容器(pause 容器)。

主要功能

  • 创建 pause 容器作为 Pod 的网络和命名空间沙箱
  • 配置 OCI 运行时规范(spec)
  • 创建 containerd 容器和任务
  • 启动沙箱进程
  • 设置监控和清理机制

关键特点

  • 直接使用 containerd 客户端,不通过 shim
  • 完整的资源管理:包括目录、容器、任务、文件等
  • 健壮的错误处理:多层 defer 确保资源清理
  • 状态管理:将沙箱状态从 Unknown 更新为 Ready

代码路径:containerd/internal/cri/server/podsandbox/sandbox_run.go

func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.ControllerInstance, retErr error) {
	var cleanupErr error
	defer func() {
		if retErr != nil && cleanupErr != nil {
			log.G(ctx).WithField("id", id).WithError(cleanupErr).Errorf("failed to fully teardown sandbox resources after earlier error: %s", retErr)
			retErr = errors.Join(retErr, CleanupErr{cleanupErr})
		}
	}()
	podSandbox := c.store.Get(id)
	if podSandbox == nil {
		return cin, fmt.Errorf("unable to find pod sandbox with id %q: %w", id, errdefs.ErrNotFound)
	}
	metadata := podSandbox.Metadata

	var (
		config = metadata.Config
		labels = map[string]string{}
	)

	sandboxImage := c.getSandboxImageName()
	// Ensure sandbox container image snapshot.
	//确保沙箱镜像存在 
	image, err := c.ensureImageExists(ctx, sandboxImage, config, metadata.RuntimeHandler)
	if err != nil {
		return cin, fmt.Errorf("failed to get sandbox image %q: %w", sandboxImage, err)
	}

	containerdImage, err := c.toContainerdImage(ctx, *image)
	if err != nil {
		return cin, fmt.Errorf("failed to get image from containerd %q: %w", image.ID, err)
	}

	ociRuntime, err := c.config.GetSandboxRuntime(config, metadata.RuntimeHandler)
	if err != nil {
		return cin, fmt.Errorf("failed to get sandbox runtime: %w", err)
	}
	log.G(ctx).WithField("podsandboxid", id).Debugf("use OCI runtime %+v", ociRuntime)

	labels["oci_runtime_type"] = ociRuntime.Type

	// Create sandbox container root directories.
	//创建沙箱根目录 
	sandboxRootDir := c.getSandboxRootDir(id)
	if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil {
		return cin, fmt.Errorf("failed to create sandbox root directory %q: %w",
			sandboxRootDir, err)
	}
	defer func() {
		if retErr != nil && cleanupErr == nil {
			// Cleanup the sandbox root directory.
			if cleanupErr = c.os.RemoveAll(sandboxRootDir); cleanupErr != nil {
				log.G(ctx).WithError(cleanupErr).Errorf("Failed to remove sandbox root directory %q",
					sandboxRootDir)
			}
		}
	}()

	volatileSandboxRootDir := c.getVolatileSandboxRootDir(id)
	if err := c.os.MkdirAll(volatileSandboxRootDir, 0755); err != nil {
		return cin, fmt.Errorf("failed to create volatile sandbox root directory %q: %w",
			volatileSandboxRootDir, err)
	}
	defer func() {
		if retErr != nil && cleanupErr == nil {
			deferCtx, deferCancel := ctrdutil.DeferContext()
			defer deferCancel()
			// Cleanup the volatile sandbox root directory.
			if cleanupErr = ensureRemoveAll(deferCtx, volatileSandboxRootDir); cleanupErr != nil {
				log.G(ctx).WithError(cleanupErr).Errorf("Failed to remove volatile sandbox root directory %q",
					volatileSandboxRootDir)
			}
		}
	}()

	// Create sandbox container.
	// NOTE: sandboxContainerSpec SHOULD NOT have side
	// effect, e.g. accessing/creating files, so that we can test
	// it safely.
	//生成 OCI spec 
	spec, err := c.sandboxContainerSpec(id, config, &image.ImageSpec.Config, metadata.NetNSPath, ociRuntime.PodAnnotations)
	if err != nil {
		return cin, fmt.Errorf("failed to generate sandbox container spec: %w", err)
	}

	log.G(ctx).WithField("podsandboxid", id).Debugf("sandbox container spec: %#+v", spew.NewFormatter(spec))
	//处理 SELinux 和特权容器
	metadata.ProcessLabel = spec.Process.SelinuxLabel
	defer func() {
		if retErr != nil {
			selinux.ReleaseLabel(metadata.ProcessLabel)
		}
	}()
	labels["selinux_label"] = metadata.ProcessLabel

	// handle any KVM based runtime
	if err := modifyProcessLabel(ociRuntime.Type, spec); err != nil {
		return cin, err
	}
	
	if config.GetLinux().GetSecurityContext().GetPrivileged() {
		// If privileged don't set selinux label, but we still record the MCS label so that
		// the unused label can be freed later.
		spec.Process.SelinuxLabel = ""
		// If privileged is enabled, sysfs should have the rw attribute
		for i, k := range spec.Mounts {
			if filepath.Clean(k.Destination) == "/sys" {
				for j, v := range spec.Mounts[i].Options {
					if v == "ro" {
						spec.Mounts[i].Options[j] = "rw"
						break
					}
				}
				break
			}
		}
	}

	// Generate spec options that will be applied to the spec later.
	specOpts, err := c.sandboxContainerSpecOpts(config, &image.ImageSpec.Config)
	if err != nil {
		return cin, fmt.Errorf("failed to generate sandbox container spec options: %w", err)
	}

	sandboxLabels := ctrdutil.BuildLabels(config.Labels, image.ImageSpec.Config.Labels, crilabels.ContainerKindSandbox)

	snapshotterOpt := []snapshots.Opt{snapshots.WithLabels(snapshots.FilterInheritedLabels(config.Annotations))}
	extraSOpts, err := sandboxSnapshotterOpts(config)
	if err != nil {
		return cin, err
	}
	snapshotterOpt = append(snapshotterOpt, extraSOpts...)

	opts := []containerd.NewContainerOpts{
		containerd.WithSnapshotter(c.imageService.RuntimeSnapshotter(ctx, ociRuntime)),
		customopts.WithNewSnapshot(id, containerdImage, snapshotterOpt...),
		containerd.WithSpec(spec, specOpts...),
		containerd.WithContainerLabels(sandboxLabels),
		containerd.WithContainerExtension(crilabels.SandboxMetadataExtension, &metadata),
		containerd.WithRuntime(ociRuntime.Type, podSandbox.Runtime.Options),
	}
	// 创建 containerd 容器 
	container, err := c.client.NewContainer(ctx, id, opts...)
	if err != nil {
		return cin, fmt.Errorf("failed to create containerd container: %w", err)
	}
	podSandbox.Container = container
	defer func() {
		if retErr != nil && cleanupErr == nil {
			deferCtx, deferCancel := ctrdutil.DeferContext()
			defer deferCancel()
			if cleanupErr = container.Delete(deferCtx, containerd.WithSnapshotCleanup); cleanupErr != nil {
				log.G(ctx).WithError(cleanupErr).Errorf("Failed to delete containerd container %q", id)
			}
			podSandbox.Container = nil
		}
	}()

	// Setup files required for the sandbox.
	//设置沙箱文件 
	if err = c.setupSandboxFiles(id, config); err != nil {
		return cin, fmt.Errorf("failed to setup sandbox files: %w", err)
	}
	defer func() {
		if retErr != nil && cleanupErr == nil {
			if cleanupErr = c.cleanupSandboxFiles(id, config); cleanupErr != nil {
				log.G(ctx).WithError(cleanupErr).Errorf("Failed to cleanup sandbox files in %q",
					sandboxRootDir)
			}
		}
	}()

	// Update sandbox created timestamp.
	info, err := container.Info(ctx)
	if err != nil {
		return cin, fmt.Errorf("failed to get sandbox container info: %w", err)
	}

	// Create sandbox task in containerd.
	log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, metadata.Name)

	var taskOpts []containerd.NewTaskOpts
	if ociRuntime.Path != "" {
		taskOpts = append(taskOpts, containerd.WithRuntimePath(ociRuntime.Path))
	}

	// We don't need stdio for sandbox container.
	//创建和启动任务 
	task, err := container.NewTask(ctx, containerdio.NullIO, taskOpts...)
	if err != nil {
		return cin, fmt.Errorf("failed to create containerd task: %w", err)
	}
	defer func() {
		if retErr != nil && cleanupErr == nil {
			deferCtx, deferCancel := ctrdutil.DeferContext()
			defer deferCancel()
			// Cleanup the sandbox container if an error is returned.
			if _, err := task.Delete(deferCtx, WithNRISandboxDelete(id), containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
				log.G(ctx).WithError(err).Errorf("Failed to delete sandbox container %q", id)
				cleanupErr = err
			}
		}
	}()

	// wait is a long running background request, no timeout needed.
	exitCh, err := task.Wait(ctrdutil.NamespacedContext())
	if err != nil {
		return cin, fmt.Errorf("failed to wait for sandbox container task: %w", err)
	}

	nric, err := nri.New()
	if err != nil {
		return cin, fmt.Errorf("unable to create nri client: %w", err)
	}
	if nric != nil {
		nriSB := &nri.Sandbox{
			ID:     id,
			Labels: config.Labels,
		}
		if _, err := nric.InvokeWithSandbox(ctx, task, v1.Create, nriSB); err != nil {
			return cin, fmt.Errorf("nri invoke: %w", err)
		}
	}

	if err := task.Start(ctx); err != nil {
		return cin, fmt.Errorf("failed to start sandbox container task %q: %w", id, err)
	}
	pid := task.Pid()
	if err := podSandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
		status.Pid = pid
		status.State = sandboxstore.StateReady
		status.CreatedAt = info.CreatedAt
		return status, nil
	}); err != nil {
		return cin, fmt.Errorf("failed to update status of pod sandbox %q: %w", id, err)
	}

	cin.SandboxID = id
	cin.Pid = task.Pid()
	cin.CreatedAt = info.CreatedAt
	cin.Labels = labels

	go func() {
		if err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh); err != nil {
			log.G(context.Background()).Warnf("failed to wait pod sandbox exit %v", err)
		}
	}()

	return
}

📝 总结与展望

Containerd 创建沙箱的核心流程:CRI 层接收请求,生成 ID,预留名称,设置网络,沙箱服务层根据配置选择控制器,而控制器层执行实际的沙箱创建,通过podsandbox 模式(默认)创建 pause 容器。

📚 参考资料

https://kubernetes.io/zh-cn/docs/reference/command-line-tools-reference/kubelet/

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐