inductor's blog

nothing but self note :)

Kubernetesのリソース要求/制限(requests/limits)の内部処理をソースコードレベルで読み解く

はじめに

この記事は筆者がKubeCon EU 2021にて発表した「Resource Requests and Limits Under the Hood: The Journey of a Pod Spec」の内容を中心とし、ブログ向けにまとめなおしたものです。

www.youtube.com

日本語でこのリソース要求/制限について内部の仕組みまで踏まえて詳細に書かれた記事はあまりないので、誰かの助けになれば幸いです。

Kubernetesにおけるリソースの要求と制限

Kubernetes上でアプリケーションを実行する際、ワークロードの特性に応じて以下のような形で必要なリソースを定義しておくことができます。

apiVersion: v1
kind: Pod
metadata:
  name: frontend
spec:
  containers:
  - name: app
    image: images.my-company.example/app:v4
    resources:
      requests:
        memory: "64Mi"
        cpu: "250m"
      limits:
        memory: "128Mi"
        cpu: "500m"

このとき、「requests」はこのPodが最低限必要とするCPU/メモリであり、「limits」はVMのようにリソース使用量に上限を設けたい場合に設定されます。この値は開発者や運用者がkubectl applyを行ったタイミングでKubernetesのAPIサーバーを経由してetcdに書き込まれ、新規作成対象のPod specとなります。

スケジューラーによるスコアリング

新規作成対象のPodはKubernetesのスケジューラーの制御ループによって検知され、クラスター内に存在するノードの中から利用可能なリソース量を持つものがフィルタリングされ、さらに複数のパラメーターから(スコアリングの結果)合致度の高いものが任意に選ばれ、etcdのPod specに配置されるべきノードの情報が書き込まれます。スケジューラーのフィルタリングやスコアリングの詳細についてはここでは明記しませんが、公式ドキュメントAlibabaの人が書いたQiita記事などを読むと雰囲気がわかりそうです。

ノードでのPod作成処理

配置されるべきノードが決定されると、今度は該当ノードにその情報が通知される必要がありますね。Kubernetesの内部処理に「通知」というものは存在せず、ここではkubeletが定期的にAPIサーバーに問い合わせて自分に割り当てられたPodに変更がなかったかを確認する制御ループを見ていくことになります。

Kubernetes v1.22では以下のリンクにある処理を見ていきます。

https://github.com/kubernetes/kubernetes/blob/release-1.22/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L725

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
}

該当行のすぐ上にあるコメントが非常に優秀で分かりやすいのですが、ここでは以下のようなステップで処理が実行されていることがわかります。

  1. sandboxとコンテナへの変更を計算
  2. killすべきsandboxがあればkill
  3. 実行対象から外れたコンテナをkill
  4. 作成すべきsandboxがあれば作成
  5. 作成すべき揮発コンテナがあれば作成
  6. 作成すべきinitコンテナがあれば作成
  7. 作成すべきコンテナがあれば作成

ここでは一般的なPodのリソース要求、制限について見ていきたいので、step 7の処理を深堀りしていきます。↓だけ見ても何もわからねえなと思いますが、実際にはstartというhelper関数に必要な処理が書かれているので、ここからちょっと上に飛びます。

https://github.com/kubernetes/kubernetes/blob/release-1.22/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L927

   // Step 7: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
        start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
    }

https://github.com/kubernetes/kubernetes/blob/release-1.22/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L873

   start := func(typeName, metricLabel string, spec *startSpec) error {
        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
        result.AddSyncResult(startContainerResult)

        isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
        if isInBackOff {
            startContainerResult.Fail(err, msg)
            klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
            return err
        }

        metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()
        klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
        // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
            // startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
            // useful to cluster administrators to distinguish "server errors" from "user errors".
            metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
            startContainerResult.Fail(err, msg)
            // known errors that are logged in other places are logged at higher levels here to avoid
            // repetitive log spam
            switch {
            case err == images.ErrImagePullBackOff:
                klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
            default:
                utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
            }
            return err
        }

        return nil
    }

ここでstartContainerという関数が呼ばれているのでkuberuntime_container.goに飛んで処理を読んでみます。関数の中にあるstep 2でコンテナの作成処理が行われています。

https://github.com/kubernetes/kubernetes/blob/release-1.22/pkg/kubelet/kuberuntime/kuberuntime_container.go#L184

func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
    container := spec.container
...
    // Step 2: create the container.
    // For a new container, the RestartCount should be 0
    restartCount := 0
    containerStatus := podStatus.FindContainerStatusByName(container.Name)
    if containerStatus != nil {
        restartCount = containerStatus.RestartCount + 1
    } else {
        // The container runtime keeps state on container statuses and
        // what the container restart count is. When nodes are rebooted
        // some container runtimes clear their state which causes the
        // restartCount to be reset to 0. This causes the logfile to
        // start at 0.log, which either overwrites or appends to the
        // already existing log.
        //
        // We are checking to see if the log directory exists, and find
        // the latest restartCount by checking the log name -
        // {restartCount}.log - and adding 1 to it.
        logDir := BuildContainerLogsDirectory(pod.Namespace, pod.Name, pod.UID, container.Name)
        restartCount, err = calcRestartCountByLogDir(logDir)
        if err != nil {
            klog.InfoS("Log directory exists but could not calculate restartCount", "logDir", logDir, "err", err)
        }
    }
...
    containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)
    if cleanupAction != nil {
        defer cleanupAction()
    }
...

generateContainerConfigの処理でPodからコンテナを作成するためのConfigを設定していそうです。中身を色々覗いていたらkuberuntime_container_linux.goでrequests/limitsを設定している箇所にたどり着きました。

https://github.com/kubernetes/kubernetes/blob/release-1.22/pkg/kubelet/kuberuntime/kuberuntime_container_linux.go#L63

   // set linux container resources
    var cpuShares int64
    cpuRequest := container.Resources.Requests.Cpu()
    cpuLimit := container.Resources.Limits.Cpu()
    memoryLimit := container.Resources.Limits.Memory().Value()
    memoryRequest := container.Resources.Requests.Memory().Value()
    oomScoreAdj := int64(qos.GetContainerOOMScoreAdjust(pod, container,
        int64(m.machineInfo.MemoryCapacity)))
    // If request is not specified, but limit is, we want request to default to limit.
    // API server does this for new containers, but we repeat this logic in Kubelet
    // for containers running on existing Kubernetes clusters.
    if cpuRequest.IsZero() && !cpuLimit.IsZero() {
        cpuShares = milliCPUToShares(cpuLimit.MilliValue())
    } else {
        // if cpuRequest.Amount is nil, then milliCPUToShares will return the minimal number
        // of CPU shares.
        cpuShares = milliCPUToShares(cpuRequest.MilliValue())
    }
    lc.Resources.CpuShares = cpuShares
    if memoryLimit != 0 {
        lc.Resources.MemoryLimitInBytes = memoryLimit
    }
    // Set OOM score of the container based on qos policy. Processes in lower-priority pods should
    // be killed first if the system runs out of memory.
    lc.Resources.OomScoreAdj = oomScoreAdj

ここでcpuShares = milliCPUToShares(cpuLimit.MilliValue())とありますが、ここではPod specに設定されたコア数ベースのCPUリソース値をCFSで使うperiodの値に変換しています。

コンピューターでプログラムを実行する際、CPUは一定の周期で命令のサイクルを持つわけですが、CPUが一定周期に持つ全体の命令サイクルの中でどのプロセスにどのくらい割り当てるかを決めるためには実行時間を指定して上げる必要があるためこのような変換処理が必要です。

f:id:inductor:20211024131911p:plain

ここを感覚的に理解できるようになるには、恐らく制御工学の一部を勉強するか、マイコンをアセンブラでいじったりしてみないと分かりづらいかもしれません。CPUの周波数が高いほどシングルスレッド性能が高いのは分かると思いますがここでは深く説明しません。

で、このmilliCPUToSharesの中身を見るとhelpers_linux.goの中で以下のような処理が行われていることがわかります。

// milliCPUToShares converts milliCPU to CPU shares
func milliCPUToShares(milliCPU int64) int64 {
    if milliCPU == 0 {
        // Return 2 here to really match kernel default for zero milliCPU.
        return minShares
    }
    // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
    shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
    if shares < minShares {
        return minShares
    }
    return shares
}

// milliCPUToQuota converts milliCPU to CFS quota and period values
func milliCPUToQuota(milliCPU int64, period int64) (quota int64) {
    // CFS quota is measured in two values:
    //  - cfs_period_us=100ms (the amount of time to measure usage across)
    //  - cfs_quota=20ms (the amount of cpu time allowed to be used across a period)
    // so in the above example, you are limited to 20% of a single CPU
    // for multi-cpu environments, you just scale equivalent amounts
    // see https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt for details
    if milliCPU == 0 {
        return
    }

    // we then convert your milliCPU to a value normalized over a period
    quota = (milliCPU * period) / milliCPUToCPU

    // quota needs to be a minimum of 1ms.
    if quota < minQuotaPeriod {
        quota = minQuotaPeriod
    }

    return
}

ここでsharesとquotaはそれぞれcgroupのパラメーターのことで、LinuxのCFSアルゴリズムを使ってタスクのスケジューリングをCPUに対して行うときの優先付けなどで利用されています。

このへんはZ Labの単細胞さん(Twitterの名前そのまま)が前に発表してた資料が最高にわかりやすいです。

speakerdeck.com

なお、cgroupを使ったこの辺の処理はかつてGoogleが作っていたOSSから引き継がれた思想で、現在の主流であるDockerを始めとしたコンテナランタイム内部でも引き継がれて使われ続けているようです。

github.com

ここでKubeletの処理に戻ってみると、requests/limitsの値はkubeletの中でcgroupで扱えるような値に内部的に変換され、実際にコンテナを作成するために使う設定としてこのあと使われていきます。

https://github.com/kubernetes/kubernetes/blob/release-1.22/staging/src/k8s.io/cri-api/pkg/apis/services.go#L35

type ContainerManager interface {
    // CreateContainer creates a new container in specified PodSandbox.
    CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
...
}

もろもろの処理を経て、kubeletでは最終的にgRPC経由でCRIランタイムに対してCreateContainer命令をContainerConfig付きで渡すところまでやってきました。ここからはコンテナランタイムの処理を見ていく必要があります。

さて、この時点で既にそれなりに複雑な処理が複数のコンポーネントにわたって行われており、初めてこのへんを読む場合は混乱してしまいそうです。そこで、この時点で全体の流れを整理してみましょう。

Requests全体の流れ

f:id:inductor:20211024133400p:plain

Requestsは値が設定されてからスケジューラーで割り当てるべきノードの評価に使われ、そこからアサインされたノードに処理が渡っていくということがわかっています。

Limits全体の流れ

f:id:inductor:20211024133502p:plain

Limitsは値が設定されたあとKubeletによって値がLinuxのcgroups向けに変換され、そのあとコンテナランタイム経由でLinuxのシステムコールを呼ぶまで引き回していく形になります。

CRI内部の処理

CRIランタイムではgRPCサーバーがプロセスとして立ち上がっており、kubeletからのリクエストを待ち受けています。

CreateContainer命令が呼び出されると、container_create.goの中にあるエンドポイントの命令が実行されます。

func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (_ *runtime.CreateContainerResponse, retErr error) {
}

このあと、CRIランタイムにおけるオブジェクトとしてのコンテナが作られたあと、我々の知る「コンテナ」としての実態であるTaskが作成されていきます。

https://github.com/containerd/containerd/blob/main/container.go#L298

func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
...
    response, err := c.client.TaskService().Create(ctx, request)
}

このCreate処理は内部的にOCIランタイムを呼んでいることから、ここからさきはOCIランタイムの責務になっていきます。この関数の中に渡されるパラメーターの中には、OCIランタイムに渡すべきOCI specのJSONスキーマなどが格納されます。

OCI内部の処理

ようやくruncまで来ました。

github.com

func (l *LinuxFactory) Create(id string, config *configs.Config) (Container, error) {
...
    c := &linuxContainer{
        id:            id,
        root:          containerRoot,
        config:        config,
        initPath:      l.InitPath,
        initArgs:      l.InitArgs,
        criuPath:      l.CriuPath,
        newuidmapPath: l.NewuidmapPath,
        newgidmapPath: l.NewgidmapPath,
        cgroupManager: l.NewCgroupsManager(config.Cgroups, nil),
    }

この構造体ポインタがオブジェクトのインスタンスを作るみたいに多分なっている(golangなんもわからん)はずで、つまりここで実際にコンテナが作成されます。

で、cgroupsのコンフィグをセットしている処理を見てみると、libcontainer/cgroups/systemd/v2.goのところにこんなのがありました。

       case "cpu.max":
            // value: quota [period]
            quota := int64(0) // 0 means "unlimited" for addCpuQuota, if period is set
            period := defCPUQuotaPeriod
            sv := strings.Fields(v)
            if len(sv) < 1 || len(sv) > 2 {
                return nil, fmt.Errorf("unified resource %q value invalid: %q", k, v)
            }
            // quota
            if sv[0] != "max" {
                quota, err = strconv.ParseInt(sv[0], 10, 64)
                if err != nil {
                    return nil, fmt.Errorf("unified resource %q period value conversion error: %w", k, err)
                }
            }
            // period
            if len(sv) == 2 {
                period, err = strconv.ParseUint(sv[1], 10, 64)
                if err != nil {
                    return nil, fmt.Errorf("unified resource %q quota value conversion error: %w", k, err)
                }
            }
            addCpuQuota(cm, &props, quota, period)

正直runcの細かい動きがあんまりわかってないので雰囲気ですが、とりあえず引き回されたvalueはこのような形でcgroupsに渡されてセットされているというのがわかりました。なのでまあ今回の目標としては達成できているかな(??)

まとめ

というわけで、全体の流れをまとめると以下のようになります。

  1. 開発者が作ったPod specがkube-apiserver経由でetcdにストアされる
  2. kube-schedulerが新規登録されたPod specを拾って、それを割り当てるためのノードを評価した上でetcdに書き込む
  3. kubeletSyncPod制御ループが新規作成すべきPodを検知すると、requests/limitsの値をCFS向けに書き換えた上でCRIランタイムに渡す
  4. CRIランタイムがgRPC命令を検知すると、設定された値をOCIランタイムに渡してバイナリを実行
  5. OCIランタイムがセットされた値に基づいてcgroupsに設定を書き込む

実装の細かい部分はある程度端折ってる部分もありますが、気になった方は自分でも調べてみると楽しいと思います。

今日はこのへんで~。