共计 11465 个字符,预计需要花费 29 分钟才能阅读完成。
本篇内容主要讲解“如何实例化一个 Taint Manager”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“如何实例化一个 Taint Manager”吧!
NewNoExecuteTaintManager
PodInformer 添加 Event Handler 时,通过调用 taintManager.PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) 往 tc.podUpdateQueue 添加 updateItem。
NodeInformer 添加 Event Handler 时,通过调用 taintManager.NodeUpdated(oldNode *v1.Node, newNode *v1.Node) 往 tc.nodeUpdateQueue 添加 updateItem。
当创建 NodeController 时,如果 runTaintManager 为 true(通过 kube-controller-manager 的 –enable-taint-manager 中指定,默认为 true),则会通过 NewNoExecuteTaintManager 来实例化一个 Taint Manager。
pkg/controller/node/nodecontroller.go:195
func NewNodeController(..) (*NodeController, error) { podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {
if nc.taintManager != nil {nc.taintManager.PodUpdated(nil, pod)
} else {
nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs{AddFunc: func(originalObj interface{}) {
if nc.taintManager != nil {nc.taintManager.NodeUpdated(nil, node)
if nc.runTaintManager {nc.taintManager = NewNoExecuteTaintManager(kubeClient)
...
return nc, nil
}
因此,创建 NodeController 时已经配置了监听 pod 和 node 的事件,并会将相关数据发送到 tc.podUpdateQueue 和 tc.nodeUpdateQueue,然后由 Taint Manager 从中取出数据进行处理。在此之前,我们先来看看 NewNoExecuteTaintManager 是如何实例化一个 Taint Manager 的。
pkg/controller/node/taint_controller.go:152
func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
tm := NoExecuteTaintManager{
client: c,
recorder: recorder,
// taintedNodes 记录每个 Node 对应的 Taint 信息。taintedNodes: make(map[string][]v1.Taint),
// nodeUpdateQueue 中取出的 updateItem 会发送到 nodeUpdateChannel,Tait Manager 从该 Channel 中取出对应的 node update info。nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize),
// podUpdateQueue 中取出的 updateItem 会发送到 podUpdateChannel,Tait Manager 从该 Channel 中取出对应的 pod update info。podUpdateChannel: make(chan *podUpdateItem, podUpdateChannelSize),
// Node Controller 监听到的 node update info 会发送到 nodeUpdateQueue。nodeUpdateQueue: workqueue.New(),
// Node Controller 监听到的 pod update info 会发送到 podUpdateQueue。podUpdateQueue: workqueue.New(),
// CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute deletePodHandler.
tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))
return tm
}
相关的代码分析见里面的代码注释。需要强调的是,我们在这里给 tm.taintEvictionQueue 注册了函数 deletePodHandler,用来通过 Taint Eviction 时删除 pod 时调用。Taint Manager Run 的时候会通过 tc.taintEvictionQueue.AddWork() 时创建 Worker 来执行 deletePodHandler。
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {return func(args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
glog.V(0).Infof(NoExecuteTaintManager is deleting Pod: %v , args.NamespacedName.String())
if emitEventFunc != nil {emitEventFunc(args.NamespacedName)
var err error
// 按照失败重试 5 次,每次间隔 10s 的重试机制,调用 apiserver 的 api 删除对应的 Pod。for i := 0; i retries; i++ {err = c.Core().Pods(ns).Delete(name, metav1.DeleteOptions{})
if err == nil {
break
time.Sleep(10 * time.Millisecond)
return err
}
Run
在 Kubernetes Node Controller 源码分析之执行篇中提到,在 Node Controller Run 的时候,如果 runTaintManager 为 true,则会调用 nc.taintManager.Run 启动 Taint Manager loop。
pkg/controller/node/nodecontroller.go:550
func (nc *NodeController) Run() {go func() {
if nc.runTaintManager {go nc.taintManager.Run(wait.NeverStop)
}
接下来,我们来看 Taint Manager 的 Run 方法。Node Controller 启动的 Taint Manager 实例其实就是 NoExecuteTaintManager,其对应的 Run 方法代码如下。
pkg/controller/node/taint_controller.go:179
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh -chan struct{}) {glog.V(0).Infof(Starting NoExecuteTaintManager)
// Functions that are responsible for taking work items out of the workqueues and putting them into channels.
// 从 tc.nodeUpdateQueue 中获取 updateItem,并发送到 tc.nodeUpdateChannel。go func(stopCh -chan struct{}) {
for {item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
nodeUpdate := item.(*nodeUpdateItem)
select {
case -stopCh:
break
case tc.nodeUpdateChannel - nodeUpdate:
}(stopCh)
// 从 tc.podUpdateQueue 中获取 updateItem,并发送到 tc.podUpdateChannel。go func(stopCh -chan struct{}) {
for {item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
podUpdate := item.(*podUpdateItem)
select {
case -stopCh:
break
case tc.podUpdateChannel - podUpdate:
}(stopCh)
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
// we don t want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for {
select {
case -stopCh:
break
// 从 tc.nodeUpdateChannel 获取 nodeUpdate 数据,然后 invoke tc.handleNodeUpdate 进行处理。case nodeUpdate := -tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
// 从 tc.podUpdateChannel 获取 podUpdate 数据,在 invoke tc.handlePodUpdate 进行处理之前,先确保 tc.nodeUpdateQueue 中的数据已经被处理完。case podUpdate := -tc.podUpdateChannel:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := -tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
default:
break priority
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
}
可见, Run 方法中分别从对应的 queue 中取出数据,然后调用 tc.handleNodeUpdate 和 tc.handlePodUpdate 进行处理。
// pkg/controller/node/taint_controller.go:365
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) {
// Delete
// 如果 nodeUpdate.newNode == nil,则表明该 Node 被删除了,那么将该 Node 的 Taints 信息从 tc.taintedNodes 缓存中删除。if nodeUpdate.newNode == nil {
node := nodeUpdate.oldNode
glog.V(4).Infof(Noticed node deletion: %#v , node.Name)
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, node.Name)
return
// Create or Update
// 如果是 Node Create 或者 Node Update Event,则更新 tc.taintedNodes 缓存中记录的该 Node 的 Taints 信息。glog.V(4).Infof(Noticed node update: %#v , nodeUpdate)
node := nodeUpdate.newNode
taints := nodeUpdate.newTaints
func() {tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
glog.V(4).Infof(Updating known taints on node %v: %v , node.Name, taints)
if len(taints) == 0 {delete(tc.taintedNodes, node.Name)
} else {tc.taintedNodes[node.Name] = taints
// 然后,获取该 Node 上所有 pods list。pods, err := getPodsAssignedToNode(tc.client, node.Name)
if err != nil {glog.Errorf(err.Error())
return
if len(pods) == 0 {
return
// Short circuit, to make this controller a bit faster.
// 如果该 Node 上的 Taints 被删除了,则取消所有该 node 上的 pod evictions。if len(taints) == 0 {glog.V(4).Infof(All taints were removed from the Node %v. Cancelling all evictions... , node.Name)
for i := range pods {tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
return
// 否则,调用 tc.processPodOnNode 根据 Node Taints info 和 Pod Tolerations info 处理该 Node 上的 Pod Eviction。now := time.Now()
for i := range pods {pod := pods[i]
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
}
handleNodeUpdate 的逻辑为:
如果 nodeUpdate.newNode == nil,则表明该 Node 被删除了,那么将该 Node 的 Taints 信息从 tc.taintedNodes 缓存中删除。
如果是 Node Create 或者 Node Update Event,则更新 tc.taintedNodes 缓存中记录的该 Node 的 Taints 信息。
获取该 Node 上所有 pods list。
如果该 Node 上的 Taints 被删除了,则取消所有该 node 上的 pod evictions。
否则,遍历 pods list 中的每个 pod,分别调用 tc.processPodOnNode 根据 Node Taints info 和 Pod Tolerations info 处理该 Node 上的 Pod Eviction。
// pkg/controller/node/taint_controller.go:334
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) {
// Delete
// 如果 podUpdate.newPod == nil,则表明该 Pod 被删除了,那么取消该 Pod Evictions。if podUpdate.newPod == nil {
pod := podUpdate.oldPod
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
glog.V(4).Infof(Noticed pod deletion: %#v , podNamespacedName)
tc.cancelWorkWithEvent(podNamespacedName)
return
// Create or Update
// 如果是 Pod Create 或者 Pod Update Event,则取出该 pod 的 node 上的 Taints info。pod := podUpdate.newPod
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
glog.V(4).Infof(Noticed pod update: %#v , podNamespacedName)
nodeName := pod.Spec.NodeName
if nodeName == {
return
taints, ok := func() ([]v1.Taint, bool) {tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
// It s possible that Node was deleted, or Taints were removed before, which triggered
// eviction cancelling if it was needed.
if !ok {
return
// 然后,调用 tc.processPodOnNode 根据 Node Taints info 和 Pod Tolerations info 处理该 Node 上的 Pod Eviction。tc.processPodOnNode(podNamespacedName, nodeName, podUpdate.newTolerations, taints, time.Now())
}
handlePodUpdate 的逻辑为:
如果 podUpdate.newPod == nil,则表明该 Pod 被删除了,那么取消该 Pod Evictions。
如果是 Pod Create 或者 Pod Update Event,则取出该 pod 的 node 上的 Taints info。
如果 node 上的 Taints info 信息为空,表明 Taints info 被删除了或者 Node 被删除了,那么就不需要处理该 node 上的 pod eviction 了, 流程结束。
否则,调用 tc.processPodOnNode 根据 Node Taints info 和 Pod Tolerations info 处理该 Node 上的 Pod Eviction。
因此,不管是 handlePodUpdate 还是 handleNodeUpdate, 最终都是通过 processPodOnNode 来处理 Pod Eviction 的。
pkg/controller/node/taint_controller.go:295
func (tc *NoExecuteTaintManager) processPodOnNode(
podNamespacedName types.NamespacedName,
nodeName string,
tolerations []v1.Toleration,
taints []v1.Taint,
now time.Time,
) {
// 如果该 node 的 taints info 为空,则取消 Taint Eviction Pods。if len(taints) == 0 {tc.cancelWorkWithEvent(podNamespacedName)
// 对比 node 的 taints info 和 pod tolerations info,判断出 node 的 taints 是否都能被 pod 所能容忍。allTolerated, usedTolerations := v1.GetMatchingTolerations(taints, tolerations)
// 如果不是全部都能容忍,那么调用立刻调用 AddWork 来创建 worker,启动 tc.taintEvictionQueue 注册的 deletePodHandler 来删除该 pod。if !allTolerated {glog.V(2).Infof(Not all taints are tolerated after update for Pod %v on %v , podNamespacedName.String(), nodeName)
// We re canceling scheduled work (if any), as we re going to delete the Pod right away.
tc.cancelWorkWithEvent(podNamespacedName)
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
return
// 否则,取 pod 的所有 tolerations 的 TolerationSeconds 的最小值作为 minTolerationTime。如果某个 Toleration 没有设置 TolerationSeconds,则表示 0,如果设置的值为负数,则用 0 替代。minTolerationTime := getMinTolerationTime(usedTolerations)
// getMinTolerationTime returns negative value to denote infinite toleration.
if minTolerationTime 0 {glog.V(4).Infof(New tolerations for %v tolerate forever. Scheduled deletion won t be cancelled if already scheduled. , podNamespacedName.String())
return
startTime := now
triggerTime := startTime.Add(minTolerationTime)
// 从 tc.taintEvictionQueue 中获取 Worker-scheduledEviction
scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
// 如果获取到不为空的 scheduledEviction,则判断 worker 创建时间加上 minTolerationTime 是否达到触发时间要求,如果没达到,则不进行 Taint Pod Eviction,流程结束。if scheduledEviction != nil {
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) {return} else {tc.cancelWorkWithEvent(podNamespacedName)
// 如果达到触发时间要求,则取消 worker,并立刻调用 AddWork 来创建 worker,启动 tc.taintEvictionQueue 注册的 deletePodHandler 来删除该 pod。tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}
processPodOnNode 的逻辑为:
如果该 node 的 taints info 为空,则取消 Taint Eviction Pods。
对比 node 的 taints info 和 pod tolerations info,判断出 node 的 taints 是否都能被 pod 所能容忍。
如果不是全部都能容忍,那么调用立刻调用 AddWork 来创建 worker,启动 tc.taintEvictionQueue 注册的 deletePodHandler 来删除该 pod。
否则,取 pod 的所有 tolerations 的 TolerationSeconds 的最小值作为 minTolerationTime。如果某个 Toleration 没有设置 TolerationSeconds,表示不作驱逐。
如果获取到不为空的 scheduledEviction,则判断 worker 创建时间加上 minTolerationTime 是否达到触发时间要求,如果没达到,则不进行 Taint Pod Eviction,流程结束。
如果达到触发时间要求,则取消 worker,并立刻调用 AddWork 来创建 worker,启动 tc.taintEvictionQueue 注册的 deletePodHandler 来删除该 pod。
如果 minTolerationTime 小于 0,则永远容忍,流程结束。
从 tc.taintEvictionQueue 中获取 Worker-scheduledEviction。
到此,相信大家对“如何实例化一个 Taint Manager”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!