Preemption抢占式调度的方法是什么

53次阅读
没有评论

共计 17809 个字符,预计需要花费 45 分钟才能阅读完成。

这篇文章主要讲解了“Preemption 抢占式调度的方法是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“Preemption 抢占式调度的方法是什么”吧!

ScheduleAlgorithm 的变化

在 Kubernetes 1.8 中,对 ScheduleAlgorithm Interface 的定义发生了改变,多了一个 Preempt(…)。因此,我在博文 Kubernetes Scheduler 原理解析(当时是基于 kubernetes 1.5)中对 scheduler 调度过程开的一句话概括“将 PodSpec.NodeName 为空的 Pods 逐个地,经过预选 (Predicates) 和优选 (Priorities) 两个步骤,挑选最合适的 Node 作为该 Pod 的 Destination。”将不再准确了。

现在应该一句话这样描述才算准确了:“将 PodSpec.NodeName 为空的 Pods 逐个地,经过预选 (Predicates) 和优选 (Priorities) 两个步骤,挑选最合适的 Node 作为该 Pod 的 Destination。如果经过预选和优选仍然没有找到合适的节点,并且启动了 Pod Priority,那么该 Pod 将会进行 Preempt 抢占式调度找到最合适的节点及需要 Evict 的 Pods。”

// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
// onto machines.
type ScheduleAlgorithm interface {Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
 // Preempt receives scheduling errors for a pod and tries to create room for
 // the pod by preempting lower priority pods if possible.
 // It returns the node where preemption happened, a list of preempted pods, and error if any.
 Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error)
 // Predicates() returns a pointer to a map of predicate functions. This is
 // exposed for testing.
 Predicates() map[string]FitPredicate
 // Prioritizers returns a slice of priority config. This is exposed for
 // testing.
 Prioritizers() []PriorityConfig
}

Scheduler.scheduleOne 开始真正的调度逻辑,每次负责一个 Pod 的调度,逻辑如下:

从 PodQueue 中获取一个 Pod。

执行对应 Algorithm 的 Schedule,进行预选和优选。

AssumePod

Bind Pod,如果 Bind Failed,ForgetPod。

在 1.8 中,但预选和优选调度完整没有找到合适 node 时(其实一定会是预选没有找到 nodes,优选只是挑更好的),还会调用 sched.preempt 进行抢占式调度。

plugin/pkg/scheduler/scheduler.go:293
func (sched *Scheduler) scheduleOne() {pod := sched.config.NextPod()
 if pod.DeletionTimestamp != nil {sched.config.Recorder.Eventf(pod, v1.EventTypeWarning,  FailedScheduling ,  skip schedule deleting pod: %v/%v , pod.Namespace, pod.Name)
 glog.V(3).Infof(Skip schedule deleting pod: %v/%v , pod.Namespace, pod.Name)
 return
 glog.V(3).Infof(Attempting to schedule pod: %v/%v , pod.Namespace, pod.Name)
 // Synchronously attempt to find a fit for the pod.
 start := time.Now()
 suggestedHost, err := sched.schedule(pod)
 metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
 if err != nil {// schedule() may have failed because the pod would not fit on any host, so we try to
 // preempt, with the expectation that the next time the pod is tried for scheduling it
 // will fit due to the preemption. It is also possible that a different pod will schedule
 // into the resources that were preempted, but this is harmless.
 if fitError, ok := err.(*core.FitError); ok {sched.preempt(pod, fitError)
 return
 // Tell the cache to assume that a pod now is running on a given node, even though it hasn t been bound yet.
 // This allows us to keep scheduling without waiting on binding to occur.
 assumedPod := *pod
 // assume modifies `assumedPod` by setting NodeName=suggestedHost
 err = sched.assume(assumedPod, suggestedHost)
 if err != nil {
 return
 // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
 go func() {
 err := sched.bind( assumedPod,  v1.Binding{ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
 Target: v1.ObjectReference{
 Kind:  Node ,
 Name: suggestedHost,
 metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
 if err != nil {glog.Errorf( Internal error binding pod: (%v) , err)
}

Scheduler.preemt

好的,关于预选和优选,我这里不做过多解读,因为整个源码逻辑和 1.5 是一样,不同的是 1.8 增加了更多的 Predicate 和 Priority Policys 及其实现。下面只看抢占式调度 Preempt 的代码。

plugin/pkg/scheduler/scheduler.go:191
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {if !utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) {glog.V(3).Infof(Pod priority feature is not enabled. No preemption is performed.)
 return  , nil
 preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
 if err != nil {glog.Errorf( Error getting the updated preemptor pod object: %v , err)
 return  , err
 node, victims, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
 if err != nil {glog.Errorf( Error preempting victims to make room for %v/%v. , preemptor.Namespace, preemptor.Name)
 return  , err
 if node == nil {
 return  , err
 glog.Infof(Preempting %d pod(s) on node %v to make room for %v/%v. , len(victims), node.Name, preemptor.Namespace, preemptor.Name)
 annotations := map[string]string{core.NominatedNodeAnnotationKey: node.Name}
 err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
 if err != nil {glog.Errorf( Error in preemption process. Cannot update pod %v annotations: %v , preemptor.Name, err)
 return  , err
 for _, victim := range victims {if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {glog.Errorf( Error preempting pod %v/%v: %v , victim.Namespace, victim.Name, err)
 return  , err
 sched.config.Recorder.Eventf(victim, v1.EventTypeNormal,  Preempted ,  by %v/%v on node %v , preemptor.Namespace, preemptor.Name, node.Name)
 return node.Name, err
}

检查 FeaturesGate 中是否开启了 PodPriority,如果没开启,则不会进行后续 Preemption 操作;

由于该 Pod 在 Predicate/Priortiy 调度过程失败后,会更新 PodCondition,记录调度失败状态及失败原因。因此需要从 apiserver 中获取 PodCondition 更新后的 Pod Object;

调用 ScheduleAlgorithm.Preempt 进行抢占式调度,选出最佳 node 和待 preempt pods(称为 victims);

调用 apiserver 给该 pod(称为 Preemptor)打上 Annotation:NominatedNodeName=nodeName;

遍历 victims,调用 apiserver 进行逐个删除这些 pods;

注意:在 scheduler 调用 shed.schedule(pod)进行预选和优选调度失败时,Pod Bind Node 失败,该 Pod 会 requeue unscheduled Cache podqueue 中,如果在这个 pod 调度过程中又有新的 pod 加入到待调度队列,那么该 pod requeue 时它前面就有其他 pod,下一次调度就是先调度在它前面的 pod,而这些 pod 的调度有可能会调度到刚刚通过 Preempt 释放资源的 Node 上,导致把刚才 Preemptor 释放的 resource 消耗掉。当再次轮到上次的 Preemptor 调度时,可能又需要触发一次某个节点的 Preempt。

genericScheduler.Preempt

ScheduleAlgorithm.Preempt 是抢占式调度的关键实现,其对应的实现在 genericScheduler 中:

plugin/pkg/scheduler/core/generic_scheduler.go:181
// preempt finds nodes with pods that can be preempted to make room for  pod  to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns the node and the list of preempted pods if such a node is found.
// TODO(bsalamat): Add priority-based scheduling. More info: today one or more
// pending pods (different from the pod that triggered the preemption(s)) may
// schedule into some portion of the resources freed up by the preemption(s)
// before the pod that triggered the preemption(s) has a chance to schedule
// there, thereby preventing the pod that triggered the preemption(s) from
// scheduling. Solution is given at:
// https://github.com/kubernetes/community/blob/master/contributors/design-proposals/pod-preemption.md#preemption-mechanics
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
 // Scheduler may return various types of errors. Consider preemption only if
 // the error is of type FitError.
 fitError, ok := scheduleErr.(*FitError)
 if !ok || fitError == nil {
 return nil, nil, nil
 err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
 if err != nil {
 return nil, nil, err
 if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {glog.V(5).Infof(Pod %v is not eligible for more preemption. , pod.Name)
 return nil, nil, nil
 allNodes, err := nodeLister.List()
 if err != nil {
 return nil, nil, err
 if len(allNodes) == 0 {
 return nil, nil, ErrNoNodesAvailable
 potentialNodes := nodesWherePreemptionMightHelp(pod, allNodes, fitError.FailedPredicates)
 if len(potentialNodes) == 0 {glog.V(3).Infof(Preemption will not help schedule pod %v on any node. , pod.Name)
 return nil, nil, nil
 nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer)
 if err != nil {
 return nil, nil, err
 for len(nodeToPods)   0 {node := pickOneNodeForPreemption(nodeToPods)
 if node == nil {
 return nil, nil, err
 passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders)
 if passes   pErr == nil {return node, nodeToPods[node], err
 if pErr != nil {glog.Errorf( Error occurred while checking extenders for preemption on node %v: %v , node, pErr)
 // Remove the node from the map and try to pick a different node.
 delete(nodeToPods, node)
 return nil, nil, err
}

sched.schedule error 检查

只有前面 sched.schedule()返回的 error 为 FitError 类型时,才会触发后续的 Preemption。FitError 就是表示 pod 在 Predicate 阶段进行某些 PredicateFunc 筛选时不通过。也就是说只有预选失败的 Pod 才会进行抢占式调度。

更新 scheduler cache 中的 NodeInfo

更新 scheduler cache 中 NodeInfo,主要是更新 Node 上 scheduled 和 Assumed Pods,作为后续 Preempt Pods 时的考虑范围,确保 Preemption 是正确的。

podEligibleToPreemptOthers 检查 pod 是否有资格进行抢占式调度

invoke podEligibleToPreemptOthers 来判断该 pod 是否适合进行后续的 Preemption,判断逻辑是:

如果该 Pod 已经包含 Annotation:NominatedNodeName=nodeName(说明该 pod 之前已经 Preempted),并且 Annotation 中的这个 Node 有比该 pod 优先级更低的 pod 正在 Terminating,则认为该 pod 不适合进行后续的 Preemption,流程结束。

除此之外,继续后续的流程。

对应代码如下:

plugin/pkg/scheduler/core/generic_scheduler.go:756
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found {if nodeInfo, found := nodeNameToInfo[nodeName]; found {for _, p := range nodeInfo.Pods() {if p.DeletionTimestamp != nil   util.GetPodPriority(p)   util.GetPodPriority(pod) {
 // There is a terminating pod on the nominated node.
 return false
 return true
}

nodesWherePreemptionMightHelp 筛选出 Potential Nodes

invoke nodesWherePreemptionMightHelp 来获取 potential nodes。nodesWherePreemptionMightHelp 的逻辑是:

NodeSelectorNotMatch,

PodNotMatchHostName,

TaintsTolerationsNotMatch,

NodeLabelPresenceViolated,

NodeNotReady,

NodeNetworkUnavailable,

NodeUnschedulable,

NodeUnknownCondition

遍历所有的 nodes,对每个 nodes 在 sched.schedule()在预选阶段失败的 Predicate 策略 (failedPredicates) 进行扫描,如果 failedPredicates 包含以下 Policy,则说明该 node 不适合作为 Preempt 的备选节点。

除此之外的 Node 均作为 Potential Nodes。

对应代码如下:

func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {potentialNodes := []*v1.Node{}
 for _, node := range nodes {
 unresolvableReasonExist := false
 failedPredicates, found := failedPredicatesMap[node.Name]
 // If we assume that scheduler looks at all nodes and populates the failedPredicateMap
 // (which is the case today), the !found case should never happen, but we d prefer
 // to rely less on such assumptions in the code when checking does not impose
 // significant overhead.
 for _, failedPredicate := range failedPredicates {
 switch failedPredicate {
 case
 predicates.ErrNodeSelectorNotMatch,
 predicates.ErrPodNotMatchHostName,
 predicates.ErrTaintsTolerationsNotMatch,
 predicates.ErrNodeLabelPresenceViolated,
 predicates.ErrNodeNotReady,
 predicates.ErrNodeNetworkUnavailable,
 predicates.ErrNodeUnschedulable,
 predicates.ErrNodeUnknownCondition:
 unresolvableReasonExist = true
 break
 // TODO(bsalamat): Please add affinity failure cases once we have specific affinity failure errors.
 if !found || !unresolvableReasonExist {glog.V(3).Infof(Node %v is a potential node for preemption. , node.Name)
 potentialNodes = append(potentialNodes, node)
 return potentialNodes
}

selectNodesForPreemption 和 selectVictimsOnNode 选出可行 Nodes 及其对应的 victims

invoke selectNodesForPreemption 从 Potential Nodes 中找出所有可行的 Nodes 及对应的 victim Pods,其对应的逻辑如为:启动 max(16, potentialNodesNum)个 worker(对应 goruntine)通过 WaitGroups 并发等待所有 node 的 check 完成:

遍历该 node 上所有的 scheduled pods(包括 assumed pods),将优先级比 Preemptor 更低的 Pods 都加入到 Potential victims List 中,并且将这些 victims 从 NodeInfoCopy 中删除,下次进行 Predicate 时就意味着 Node 上有更多资源可用。

对 Potential victims 中元素进行排序,排序规则是按照优先级从高到底排序的,index 为 0 的对应的优先级最高。

检查 Preemptor 是否能 scheduler 配置的所有 Predicates Policy(基于前面将这些 victims 从 NodeInfoCopy 中删除,将所有更低优先级的 pods 资源全部释放了),如果不通过则返回,表示该 node 不合适。All Predicate 通过后,继续下面流程。

遍历所有的 Potential victims list item(已经按照优先级从高到底排序),试着把 Potential victims 中第一个 Pod(优先级最高)加回到 NodeInfoCopy 中,再检查 Preemptor 是否能 scheduler 配置的所有 Predicates Policy,如果不满足就把该 pod 再从 NodeInfoCopy 中删除,并且正式加入到 victims list 中。接着对 Potential victims 中第 2,3… 个 Pod 进行同样处理。这样做,是为了保证尽量保留优先级更高的 Pods,尽量删除更少的 Pods。

最终返回每个可行 node 及其对应 victims list。

selectNodesForPreemption 代码如下,其实核心代码在 selectVictimsOnNode。

plugin/pkg/scheduler/core/generic_scheduler.go:583
func selectNodesForPreemption(pod *v1.Pod,
 nodeNameToInfo map[string]*schedulercache.NodeInfo,
 potentialNodes []*v1.Node,
 predicates map[string]algorithm.FitPredicate,
 metadataProducer algorithm.PredicateMetadataProducer,
) (map[*v1.Node][]*v1.Pod, error) {nodeNameToPods := map[*v1.Node][]*v1.Pod{}
 var resultLock sync.Mutex
 // We can use the same metadata producer for all nodes.
 meta := metadataProducer(pod, nodeNameToInfo)
 checkNode := func(i int) {nodeName := potentialNodes[i].Name
 var metaCopy algorithm.PredicateMetadata
 if meta != nil {metaCopy = meta.ShallowCopy()
 pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates)
 if fits {resultLock.Lock()
 nodeNameToPods[potentialNodes[i]] = pods
 resultLock.Unlock()
 workqueue.Parallelize(16, len(potentialNodes), checkNode)
 return nodeNameToPods, nil
}

plugin/pkg/scheduler/core/generic_scheduler.go:659
func selectVictimsOnNode(
 pod *v1.Pod,
 meta algorithm.PredicateMetadata,
 nodeInfo *schedulercache.NodeInfo,
 fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) {potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
 nodeInfoCopy := nodeInfo.Clone()
 removePod := func(rp *v1.Pod) {nodeInfoCopy.RemovePod(rp)
 if meta != nil {meta.RemovePod(rp)
 addPod := func(ap *v1.Pod) {nodeInfoCopy.AddPod(ap)
 if meta != nil {meta.AddPod(ap, nodeInfoCopy)
 // As the first step, remove all the lower priority pods from the node and
 // check if the given pod can be scheduled.
 podPriority := util.GetPodPriority(pod)
 for _, p := range nodeInfoCopy.Pods() {if util.GetPodPriority(p)   podPriority {potentialVictims.Items = append(potentialVictims.Items, p)
 removePod(p)
 potentialVictims.Sort()
 // If the new pod does not fit after removing all the lower priority pods,
 // we are almost done and this node is not suitable for preemption. The only condition
 // that we should check is if the  pod  is failing to schedule due to pod affinity
 // failure.
 // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
 if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
 if err != nil {glog.Warningf( Encountered error while selecting victims on node %v: %v , nodeInfo.Node().Name, err)
 return nil, false
 victims := []*v1.Pod{}
 // Try to reprieve as many pods as possible starting from the highest priority one.
 for _, p := range potentialVictims.Items {lpp := p.(*v1.Pod)
 addPod(lpp)
 if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {removePod(lpp)
 victims = append(victims, lpp)
 glog.V(5).Infof(Pod %v is a potential preemption victim on node %v. , lpp.Name, nodeInfo.Node().Name)
 return victims, true
}

pickOneNodeForPreemption 从可行 Nodes 中找出最合适的一个 Node

如果上一步至少找到一个可行 node,则调用 pickOneNodeForPreemption 按照以下逻辑选择一个最合适的 node:

选择 victims 中最高 pod 优先级最低的那个 Node。

如果上一步有不止一个 Nodes 满足条件,则再对选择所有 victims 优先级之和最小的那个 Node。

如果上一步有不止一个 Nodes 满足条件,则再选择 victims pod 数最少的 Node。

如果上一步有不止一个 Nodes 满足条件,则再随机选择一个 Node。

以上每一步的 Nodes 列表,都是基于上一步筛选后的 Nodes。

plugin/pkg/scheduler/core/generic_scheduler.go:501
func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node {
 type nodeScore struct {
 node *v1.Node
 highestPriority int32
 sumPriorities int64
 numPods int
 if len(nodesToPods) == 0 {
 return nil
 minHighestPriority := int32(math.MaxInt32)
 minPriorityScores := []*nodeScore{}
 for node, pods := range nodesToPods {if len(pods) == 0 {
 // We found a node that doesn t need any preemption. Return it!
 // This should happen rarely when one or more pods are terminated between
 // the time that scheduler tries to schedule the pod and the time that
 // preemption logic tries to find nodes for preemption.
 return node
 // highestPodPriority is the highest priority among the victims on this node.
 highestPodPriority := util.GetPodPriority(pods[0])
 if highestPodPriority   minHighestPriority {
 minHighestPriority = highestPodPriority
 minPriorityScores = nil
 if highestPodPriority == minHighestPriority {minPriorityScores = append(minPriorityScores,  nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)})
 if len(minPriorityScores) == 1 {return minPriorityScores[0].node
 // There are a few nodes with minimum highest priority victim. Find the
 // smallest sum of priorities.
 minSumPriorities := int64(math.MaxInt64)
 minSumPriorityScores := []*nodeScore{}
 for _, nodeScore := range minPriorityScores {
 var sumPriorities int64
 for _, pod := range nodesToPods[nodeScore.node] {
 // We add MaxInt32+1 to all priorities to make all of them  = 0. This is
 // needed so that a node with a few pods with negative priority is not
 // picked over a node with a smaller number of pods with the same negative
 // priority (and similar scenarios).
 sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
 if sumPriorities   minSumPriorities {
 minSumPriorities = sumPriorities
 minSumPriorityScores = nil
 nodeScore.sumPriorities = sumPriorities
 if sumPriorities == minSumPriorities {minSumPriorityScores = append(minSumPriorityScores, nodeScore)
 if len(minSumPriorityScores) == 1 {return minSumPriorityScores[0].node
 // There are a few nodes with minimum highest priority victim and sum of priorities.
 // Find one with the minimum number of pods.
 minNumPods := math.MaxInt32
 minNumPodScores := []*nodeScore{}
 for _, nodeScore := range minSumPriorityScores {
 if nodeScore.numPods   minNumPods {
 minNumPods = nodeScore.numPods
 minNumPodScores = nil
 if nodeScore.numPods == minNumPods {minNumPodScores = append(minNumPodScores, nodeScore)
 // At this point, even if there are more than one node with the same score,
 // return the first one.
 if len(minNumPodScores)   0 {return minNumPodScores[0].node
 glog.Errorf(Error in logic of node scoring for preemption. We should never reach here!)
 return nil
}

最合适的 Node 仍然要交给 extender(if configed)检查

如果 scheduler 配置 extender scheduler,则还需要通过 invoke nodePassesExtendersForPreemption 再次将该 pod 和(假设)剔除 victims 的该 node 交给 extender.Filter 进行一下检查,只有检查通过了才返回该 node 作为最终选择的 Preempt node。

关于 extender 的理解,请参考如何对 kubernetes scheduler 进行二次开发和 Kubernetes Scheduler 源码分析。其实用的场景不多,现在支持自定义调度器了,就更少需要使用 scheduler extender 了。

感谢各位的阅读,以上就是“Preemption 抢占式调度的方法是什么”的内容了,经过本文的学习后,相信大家对 Preemption 抢占式调度的方法是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计17809字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)