Kubernetes Scheduler的优先级队列是什么

60次阅读
没有评论

共计 12458 个字符,预计需要花费 32 分钟才能阅读完成。

这篇文章主要讲解了“Kubernetes Scheduler 的优先级队列是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“Kubernetes Scheduler 的优先级队列是什么”吧!

从 Kubernetes 1.8 开始,Scheduler 提供了基于 Pod Priorty 的抢占式调度,我在解析 Kubernetes 1.8 中的基于 Pod 优先级的抢占式调度和 Kubernetes 1.8 抢占式调度 Preemption 源码分析中对此做过深入分析。但这还不够,当时调度队列只有 FIFO 类型,并不支持优先级队列,这会导致 High Priority Pod 抢占 Lower Priority Pod 后再次进入 FIFO 队列中排队,经常会导致抢占的资源被队列前面的 Lower Priority Pod 占用,导致 High Priority Pod Starvation 的问题。为了减轻这一问题,从 Kubernetes 1.9 开始提供 Pod 优先级的调度队列,即 PriorityQueue,这同样需要用户打开 PodPriority 这个 Feature Gate。

PriorityQueuePriorityQueue Struct

先看看 PriorityQueue 的结构定义。

type PriorityQueue struct {
 lock sync.RWMutex
 cond sync.Cond
 activeQ *Heap
 unschedulableQ *UnschedulablePodsMap
 nominatedPods map[string][]*v1.Pod
 receivedMoveRequest bool
}

activeQ:PriorityQueue 的 Sub-Queue 之一,是一个有序的 Heap 结构,按照 Pod 优先级从高到低递减的顺序存放待调度的 Pending Pod 相关信息,优先级最高的 Pod 信息在最上面,Pop Heap 时将得到最高优先级的 Pod 信息。

unschedulableQ:PriorityQueue 的 Sub-Queue 之一,主要是是一个无序的 Map,key 为 pod.Name + _ + pod.Namespace,value 为那些已经尝试调度并且调度失败的 UnSchedulable 的 Pod Object。

nominatedPods:为 Map 结构,key 为 node name,value 为该 Node 上 Nominated Pod Objects。当发生抢占调度时,preemptor pods 会打上 NominatedNodeName Annotation,表示经过抢占调度的逻辑后,该 Pod 希望能调度到 NominatedNodeName 这个 Node 上,调度时会考虑这个,防止高优先级的 Pods 进行抢占调度释放了低优先级 Pods 到它被再次调度这个时间段内,抢占的资源又被低优先级的 Pods 占用了。关于 scheduler 怎么处理 Nominated Pods,我后续会单独写篇博客来分析。

receivedMoveRequest:当 scheduler 将 Pods 从 unschedulableQ 移到 activeQ 时,这个值设为 true。当 scheduler 从 activeQ 中 Pop 一个 Pods 时,这个值设为 false。这表示当 scheduler 要调度某个 Pod 时是否接受到 Move 请求。当调度发生 Error 时,会尝试将 UnSchedulable Pod 重新加入到调度队列 (unSchedulableQ or activeQ) 中,这时只有当 receivedMoveRequest 为 false 并且该 Pod Condition Status 为 False 或者 Unschedulable 时,才会将该 Pod Add 到 unschedulableQ(或者 Update it)。

activeQ

active 是真正实现优先级调度的 Heap,我们继续看看这个 Heap 的实现。

type Heap struct {
 data *heapData

type heapItem struct {obj interface{} // The object which is stored in the heap. index int // The index of the object s key in the Heap.queue. }

heapData 是 activeQ 中真正用来存放 items 的结构:

items:Map 结构,key 为 Heap 中对象的 key,通过下面的 keyFunc 生成,value 为 heapItem 对象,heapItem 包括真正的 Pod Object 及其在 Heap 中的 index。

queue:string array,顺序存放 Pod 对应的 key,按照优先级从高到低的顺序对应 index 从 0 到高。

keyFunc:根据 Pod Object 生成对应的 key 的 Function,格式为 meta.GetNamespace() + / + meta.GetName。

lessFunc:用来根据 Pod 优先级比较 Heap 中的 Pod Object(然后决定其在 Heap 中的 index,index 为 0 的 Pod 优先级最高,随着 index 递增,Pod 优先级递减)。

NewPriorityQueue

在 scheduler config factory 创建时,会注册 podQueue 的创建 Func 为 NewSchedulingQueue。NewSchedulingQueue 会检查 PodPriority Feature Gate 是否 enable(截止 Kubernetes 1.10 版本,默认 disable),如果 PodPriority enable,则会 invoke NewPriorityQueue 创建 PriorityQueue 来管理未调度的 Pods。如果 PodPriority disable,则使用大家熟悉的 FIFO Queue。

func NewSchedulingQueue() SchedulingQueue {if util.PodPriorityEnabled() {return NewPriorityQueue()
 return NewFIFO()}

NewPriorityQueue 初始化优先级队列代码如下。

// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue() *PriorityQueue {
 pq :=  PriorityQueue{activeQ: newHeap(cache.MetaNamespaceKeyFunc, util.HigherPriorityPod),
 unschedulableQ: newUnschedulablePodsMap(),
 nominatedPods: map[string][]*v1.Pod{},
 pq.cond.L =  pq.lock
 return pq
}

主要初始化 activeQ、unschedulableQ、nominatedPods。

newHeap 初始化 activeQ 时,注册 heapData 对应的 keyFunc 和 lessFunc。

unschedulableQ 初始化时,注册 keyFunc。

cache.MetaNamespaceKeyFunc

newHeap 构建 activeQ 的时候,传入两个参数,第一个就是 keyFunc: MetaNamespaceKeyFunc。

func MetaNamespaceKeyFunc(obj interface{}) (string, error) {if key, ok := obj.(ExplicitKey); ok {return string(key), nil
 meta, err := meta.Accessor(obj)
 if err != nil {return  , fmt.Errorf( object has no meta: %v , err)
 if len(meta.GetNamespace())   0 {return meta.GetNamespace() +  /  + meta.GetName(), nil
 return meta.GetName(), nil}

MetaNamespaceKeyFunc 根据 Pod Object 生成对应的 key 的 Function,格式为 meta.GetNamespace() + / + meta.GetName。

util.HigherPriorityPod

newHeap 传入的第二个参数是 lessFunc:HigherPriorityPod。

const (
 DefaultPriorityWhenNoDefaultClassExists = 0
func HigherPriorityPod(pod1, pod2 interface{}) bool {return GetPodPriority(pod1.(*v1.Pod))   GetPodPriority(pod2.(*v1.Pod))
func GetPodPriority(pod *v1.Pod) int32 {
 if pod.Spec.Priority != nil {
 return *pod.Spec.Priority
 return scheduling.DefaultPriorityWhenNoDefaultClassExists
}

HigherPriorityPod 用来根据 Pod 优先级比较 Heap 中的 Pod Object,然后决定其在 Heap 中的 index。

index 为 0 的 Pod 优先级最高,随着 index 递增,Pod 优先级递减。

注意:如果 pod.Spec.Priority 为 nil(意味着这个 Pod 在创建时集群里还没有对应的 global default PriorityClass Object),并不是去把现在 global default PriorityClass 中的值设置给这个 Pod.Spec.Priority,而是设置为 0。个人觉得,设置为默认值比较合理。

newUnschedulablePodsMap

unschedulableQ 的构建是通过调用 newUnschedulablePodsMap 完成的,里面进行了 UnschedulablePodsMap 的 pods 的初始化,以及 pods map 中 keyFunc 的注册。

func newUnschedulablePodsMap() *UnschedulablePodsMap {
 return  UnschedulablePodsMap{pods: make(map[string]*v1.Pod),
 keyFunc: util.GetPodFullName,
func GetPodFullName(pod *v1.Pod) string {return pod.Name +  _  + pod.Namespace}

注意:unschedulableQ 中 keyFunc 实现的 key 生成规则是 pod.Name + _ + pod.Namespace,不同于 activeQ 中 keyFunc(格式为 meta.GetNamespace() + / + meta.GetName)。我也不理解为何要搞成两种不同的格式,统一按照 activeQ 中的 keyFunc 就很好。

Add Object to Heap

前面了解了 PriorityQueue 的结构,接着我们就要思考怎么往优先级 Heap(activeQ)中添加对象了。

func (h *Heap) Add(obj interface{}) error {key, err := h.data.keyFunc(obj)
 if err != nil {return cache.KeyError{Obj: obj, Err: err}
 if _, exists := h.data.items[key]; exists {h.data.items[key].obj = obj
 heap.Fix(h.data, h.data.items[key].index)
 } else {heap.Push(h.data,  itemKeyValue{key, obj})
 return nil
func Push(h Interface, x interface{}) {h.Push(x)
 up(h, h.Len()-1)
func up(h Interface, j int) {
 for {i := (j - 1) / 2 // parent
 if i == j || !h.Less(j, i) {
 break
 h.Swap(i, j)
 j = i
func (h *heapData) Less(i, j int) bool {if i   len(h.queue) || j   len(h.queue) {
 return false
 itemi, ok := h.items[h.queue[i]]
 if !ok {
 return false
 itemj, ok := h.items[h.queue[j]]
 if !ok {
 return false
 return h.lessFunc(itemi.obj, itemj.obj)
}

往 activeQ 中添加 Pod 时,如果该 Pod 已经存在,则根据其 PriorityClass Value 更新它在 heap 中的 index,否则把它 Push 入堆。

Push 和 Fix 类似,都需要对该 Pod 在 activeQ heap 中进行重新排序。排序时,通过 Less Func 进行比较,Less Func 最终就是 invoke 前面注册的 activeQ 中的 lessFunc,即 HigherPriorityPod。也就说 Push 和 Fix 时会根据 Pod 的优先级从高到低依次对应 index 从小到大。

Pop Object from Heap

使用 PriorityQueue 进行待调度 Pod 管理时,会从 activeQ 中 Pop 一个 Pod 出来,这个 Pod 是 heap 中的第一个 Pod,也是优先级最高的 Pod。

func (h *Heap) Pop() (interface{}, error) {obj := heap.Pop(h.data)
 if obj != nil {
 return obj, nil
 return nil, fmt.Errorf(object was removed from heap data)
func Pop(h Interface) interface{} {n := h.Len() - 1
 h.Swap(0, n)
 down(h, 0, n)
 return h.Pop()
func down(h Interface, i, n int) {
 for {
 j1 := 2*i + 1
 if j1  = n || j1   0 { // j1   0 after int overflow
 break
 j := j1 // left child
 if j2 := j1 + 1; j2   n   !h.Less(j1, j2) {
 j = j2 // = 2*i + 2 // right child
 if !h.Less(j, i) {
 break
 h.Swap(i, j)
 i = j
}

从 activeQ heap 中 Pop 一个 Pod 出来时,最终也是通过 Less Func 进行比较 (即 HigherPriorityPod) 找出最高优先级的 Pod。

Pod Queue Handler

了解了 PriorityQueue 及 Pod 进出 Heap 的原理之后,我们回到 Scheduler Config Factory,看看 Scheduler 中 podInformer、nodeInformer、serviceInformer、pvcInformer 等注册的 EventHandler 中对 PriorityQueue 的操作。

func NewConfigFactory(...) scheduler.Configurator {
 // scheduled pod cache
 podInformer.Informer().AddEventHandler(
 cache.FilteringResourceEventHandler{FilterFunc: func(obj interface{}) bool {switch t := obj.(type) {
 case *v1.Pod:
 return assignedNonTerminatedPod(t)
 case cache.DeletedFinalStateUnknown:
 if pod, ok := t.Obj.(*v1.Pod); ok {return assignedNonTerminatedPod(pod)
 runtime.HandleError(fmt.Errorf( unable to convert object %T to *v1.Pod in %T , obj, c))
 return false
 default:
 runtime.HandleError(fmt.Errorf( unable to handle object in %T: %T , c, obj))
 return false
 Handler: cache.ResourceEventHandlerFuncs{
 AddFunc: c.addPodToCache,
 UpdateFunc: c.updatePodInCache,
 DeleteFunc: c.deletePodFromCache,
 // unscheduled pod queue
 podInformer.Informer().AddEventHandler(
 cache.FilteringResourceEventHandler{FilterFunc: func(obj interface{}) bool {switch t := obj.(type) {
 case *v1.Pod:
 return unassignedNonTerminatedPod(t)
 case cache.DeletedFinalStateUnknown:
 if pod, ok := t.Obj.(*v1.Pod); ok {return unassignedNonTerminatedPod(pod)
 runtime.HandleError(fmt.Errorf( unable to convert object %T to *v1.Pod in %T , obj, c))
 return false
 default:
 runtime.HandleError(fmt.Errorf( unable to handle object in %T: %T , c, obj))
 return false
 Handler: cache.ResourceEventHandlerFuncs{
 AddFunc: c.addPodToSchedulingQueue,
 UpdateFunc: c.updatePodInSchedulingQueue,
 DeleteFunc: c.deletePodFromSchedulingQueue,
 // ScheduledPodLister is something we provide to plug-in functions that
 // they may need to call.
 c.scheduledPodLister = assignedPodLister{podInformer.Lister()}
 nodeInformer.Informer().AddEventHandler(
 cache.ResourceEventHandlerFuncs{
 AddFunc: c.addNodeToCache,
 UpdateFunc: c.updateNodeInCache,
 DeleteFunc: c.deleteNodeFromCache,
 c.nodeLister = nodeInformer.Lister()
 // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
 pvcInformer.Informer().AddEventHandler(
 cache.ResourceEventHandlerFuncs{
 AddFunc: c.onPvcAdd,
 UpdateFunc: c.onPvcUpdate,
 DeleteFunc: c.onPvcDelete,
 c.pVCLister = pvcInformer.Lister()
 // This is for ServiceAffinity: affected by the selector of the service is updated.
 // Also, if new service is added, equivalence cache will also become invalid since
 // existing pods may be  captured  by this service and change this predicate result.
 serviceInformer.Informer().AddEventHandler(
 cache.ResourceEventHandlerFuncs{
 AddFunc: c.onServiceAdd,
 UpdateFunc: c.onServiceUpdate,
 DeleteFunc: c.onServiceDelete,
 c.serviceLister = serviceInformer.Lister()}

PodInformer EventHandler for Scheduled Pod

通过 assignedNonTerminatedPod FilterFunc 过滤出那些已经 Scheduled 并且 NonTerminated Pods,然后再对这些 Pods 的 Add/Update/Delete Event Handler 进行注册,这里我们只关注对 PriorityQueue 的操作。

// assignedNonTerminatedPod selects pods that are assigned and non-terminal (scheduled and running).
func assignedNonTerminatedPod(pod *v1.Pod) bool {if len(pod.Spec.NodeName) == 0 {
 return false
 if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
 return false
 return true
}

addPodToCache Handler

注册 Add assignedNonTerminatedPod Event Handler 为 addPodToCache。

func (c *configFactory) addPodToCache(obj interface{}) {
 ...
 c.podQueue.AssignedPodAdded(pod)
// AssignedPodAdded is called when a bound pod is added. Creation of this pod
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {p.lock.Lock()
 defer p.lock.Unlock()
 for _, pod := range pods {if err := p.activeQ.Add(pod); err == nil {p.unschedulableQ.delete(pod)
 } else {glog.Errorf( Error adding pod %v to the scheduling queue: %v , pod.Name, err)
 p.receivedMoveRequest = true
 p.cond.Broadcast()
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
// any affinity term that matches  pod .
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {p.lock.RLock()
 defer p.lock.RUnlock()
 var podsToMove []*v1.Pod
 for _, up := range p.unschedulableQ.pods {
 affinity := up.Spec.Affinity
 if affinity != nil   affinity.PodAffinity != nil {terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
 for _, term := range terms {namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(up,  term)
 selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
 if err != nil {glog.Errorf( Error getting label selectors for pod: %v. , up.Name)
 if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {podsToMove = append(podsToMove, up)
 break
 return podsToMove
}

addPodToCache 除了将 pod 加入到 schedulerCache 中之外,还会调用 podQueue.AssignedPodAdded。

对于 PriorityQueue 而言,AssignedPodAdded 负责 unSchedulableQ 中的 pods 进行与该 pod 的 Pod Affinity 检查,把那些满足 Pod Affinity 的 pods 从 unSchedulableQ 中移到 activeQ 中,待 scheduler 进行调度。

在这里要注意 movePodsToActiveQueue 中设置了 receivedMoveRequest 为 true。

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {p.lock.Lock()
 defer p.lock.Unlock()
 if p.unschedulableQ.get(pod) != nil {return fmt.Errorf( pod is already present in unschedulableQ)
 if _, exists, _ := p.activeQ.Get(pod); exists {return fmt.Errorf( pod is already present in the activeQ)
 if !p.receivedMoveRequest   isPodUnschedulable(pod) {p.unschedulableQ.addOrUpdate(pod)
 p.addNominatedPodIfNeeded(pod)
 return nil
 err := p.activeQ.Add(pod)
 if err == nil {p.addNominatedPodIfNeeded(pod)
 p.cond.Broadcast()
 return err
}

如果 receivedMoveRequest 为 false 并且该 Pod Condition Status 为 False 或者 Unschedulable 时,才会将该 Pod Add/Update 到 unschedulableQ,否则加入到 activeQ。

因此 receivedMoveRequest 设置错误可能会导致该 pod 本应该加入到 unSchedulableQ 中,却被加入到了 activeQ 中,这会导致 scheduler 多做一次无效的调度,当然这对性能的影响是很小的。

但是这里应该是有问题的,如果 getUnschedulablePodsWithMatchingAffinityTerm 得到的 podsToMove 数组为空时,并没有 pods 会真正从 unSchedulableQ 中移到 activeQ 中,此时 MoveRequest 是无效的,receivedMoveRequest 仍然应该为 false。

上面的 receivedMoveRequest 设置不对带来什么问题呢?当某个 pod 调度发生 Error 时会调用 AddUnschedulableIfNotPresent 将该 pod 加入到 unSchedulableQ 或者 activeQ 中。

updatePodInCache

注册 Update assignedNonTerminatedPod Event Handler 为 updatePodInCache。

func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
c.podQueue.AssignedPodUpdated(newPod)

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计12458字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)