Kubernetes Job Controller怎么构造

64次阅读
没有评论

共计 16744 个字符,预计需要花费 42 分钟才能阅读完成。

这篇文章主要讲解了“Kubernetes Job Controller 怎么构造”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“Kubernetes Job Controller 怎么构造”吧!

实现流程图

废话不多说,先把完整流程贴出来。

New JobController

type JobController struct {
 kubeClient clientset.Interface
 podControl controller.PodControlInterface
 // To allow injection of updateJobStatus for testing.
 updateHandler func(job *batch.Job) error
 syncHandler func(jobKey string) (bool, error)
 // podStoreSynced returns true if the pod store has been synced at least once.
 // Added as a member to the struct to allow injection for testing.
 podStoreSynced cache.InformerSynced
 // jobStoreSynced returns true if the job store has been synced at least once.
 // Added as a member to the struct to allow injection for testing.
 jobStoreSynced cache.InformerSynced
 // A TTLCache of pod creates/deletes each rc expects to see
 expectations controller.ControllerExpectationsInterface
 // A store of jobs
 jobLister batchv1listers.JobLister
 // A store of pods, populated by the podController
 podStore corelisters.PodLister
 // Jobs that need to be updated
 queue workqueue.RateLimitingInterface
 recorder record.EventRecorder

func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. eventBroadcaster.StartRecordingToSink(v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events()}) if kubeClient != nil   kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {metrics.RegisterMetricAndTrackRateLimiterUsage( job_controller , kubeClient.CoreV1().RESTClient().GetRateLimiter()) jm :=  JobController{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component:  job-controller}), expectations: controller.NewControllerExpectations(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff),  job ), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component:  job-controller}), jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.enqueueController, UpdateFunc: jm.updateJob, DeleteFunc: jm.enqueueController, jm.jobLister = jobInformer.Lister() jm.jobStoreSynced = jobInformer.Informer().HasSynced podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.addPod, UpdateFunc: jm.updatePod, DeleteFunc: jm.deletePod, jm.podStore = podInformer.Lister() jm.podStoreSynced = podInformer.Informer().HasSynced jm.updateHandler = jm.updateJobStatus jm.syncHandler = jm.syncJob return jm }

构造 JobController,并初始化相关数据,比如 rate limiter queue;

watch pod and job object;

注册 podInformer 的 add/del/update EventHandler;

注册 jobInformer 的 add/del/update EventHandler;

注册 updataHandler 为 updateJobStatus,用来更新 Job 状态;

注册 syncHandler 为 syncJob,用来进行处理 queue 中的 Job;

JobController Run

// Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh  -chan struct{}) {defer utilruntime.HandleCrash()
 defer jm.queue.ShutDown()
 glog.Infof(Starting job controller)
 defer glog.Infof(Shutting down job controller)
 if !controller.WaitForCacheSync(job , stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
 return
 for i := 0; i   workers; i++ {go wait.Until(jm.worker, time.Second, stopCh)
 -stopCh
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *JobController) worker() {for jm.processNextWorkItem() {func (jm *JobController) processNextWorkItem() bool {key, quit := jm.queue.Get()
 if quit {
 return false
 defer jm.queue.Done(key)
 forget, err := jm.syncHandler(key.(string))
 if err == nil {
 if forget {jm.queue.Forget(key)
 return true
 utilruntime.HandleError(fmt.Errorf( Error syncing job: %v , err))
 jm.queue.AddRateLimited(key)
 return true
}

WaitForCacheSync 等待 jobController cache 同步;

启动 5 个 goruntine,每个协程分别执行 worker,每个 worker 执行完后等待 1s,继续执行,如此循环;

worker 负责从从 queue 中 get job key,对每个 job,调用 syncJob 进行同步,如果 syncJob 成功,则 forget the job(其实就是让 rate limiter 停止 tracking it),否则将该 key 再次加入到 queue 中,等待下次 sync。

syncJob

// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *JobController) syncJob(key string) (bool, error) {startTime := time.Now()
 defer func() {glog.V(4).Infof(Finished syncing job %q (%v) , key, time.Now().Sub(startTime))
 ns, name, err := cache.SplitMetaNamespaceKey(key)
 if err != nil {
 return false, err
 if len(ns) == 0 || len(name) == 0 {return false, fmt.Errorf( invalid job key %q: either namespace or name is missing , key)
 sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
 if err != nil {if errors.IsNotFound(err) {glog.V(4).Infof(Job has been deleted: %v , key)
 jm.expectations.DeleteExpectations(key)
 return true, nil
 return false, err
 job := *sharedJob
 // if job was finished previously, we don t want to redo the termination
 if IsJobFinished(job) {
 return true, nil
 // retrieve the previous number of retry
 previousRetry := jm.queue.NumRequeues(key)
 // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
 // and update the expectations after we ve retrieved active pods from the store. If a new pod enters
 // the store after we ve checked the expectation, the job sync is just deferred till the next relist.
 jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
 pods, err := jm.getPodsForJob(job)
 if err != nil {
 return false, err
 activePods := controller.FilterActivePods(pods)
 active := int32(len(activePods))
 succeeded, failed := getStatus(pods)
 conditions := len(job.Status.Conditions)
 // job first start
 if job.Status.StartTime == nil {now := metav1.Now()
 job.Status.StartTime =  now
 // enqueue a sync to check if job past ActiveDeadlineSeconds
 if job.Spec.ActiveDeadlineSeconds != nil {glog.V(4).Infof( Job %s have ActiveDeadlineSeconds will sync after %d seconds ,
 key, *job.Spec.ActiveDeadlineSeconds)
 jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
 var manageJobErr error
 jobFailed := false
 var failureReason string
 var failureMessage string
 jobHaveNewFailure := failed   job.Status.Failed
 // check if the number of failed jobs increased since the last syncJob
 if jobHaveNewFailure   (int32(previousRetry)+1   *job.Spec.BackoffLimit) {
 jobFailed = true
 failureReason =  BackoffLimitExceeded 
 failureMessage =  Job has reach the specified backoff limit 
 } else if pastActiveDeadline(job) {
 jobFailed = true
 failureReason =  DeadlineExceeded 
 failureMessage =  Job was active longer than specified deadline 
 if jobFailed {errCh := make(chan error, active)
 jm.deleteJobPods(job, activePods, errCh)
 select {
 case manageJobErr =  -errCh:
 if manageJobErr != nil {
 break
 default:
 // update status values accordingly
 failed += active
 active = 0
 job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
 jm.recorder.Event(job, v1.EventTypeWarning, failureReason, failureMessage)
 } else {
 if jobNeedsSync   job.DeletionTimestamp == nil {active, manageJobErr = jm.manageJob(activePods, succeeded,  job)
 completions := succeeded
 complete := false
 if job.Spec.Completions == nil {
 // This type of job is complete when any pod exits with success.
 // Each pod is capable of
 // determining whether or not the entire Job is done. Subsequent pods are
 // not expected to fail, but if they do, the failure is ignored. Once any
 // pod succeeds, the controller waits for remaining pods to finish, and
 // then the job is complete.
 if succeeded   0   active == 0 {complete = true} else {
 // Job specifies a number of completions. This type of job signals
 // success by having that number of successes. Since we do not
 // start more pods than there are remaining completions, there should
 // not be any remaining active pods once this count is reached.
 if completions  = *job.Spec.Completions {
 complete = true
 if active   0 {jm.recorder.Event( job, v1.EventTypeWarning,  TooManyActivePods ,  Too many active pods running after completion count reached)
 if completions   *job.Spec.Completions {jm.recorder.Event( job, v1.EventTypeWarning,  TooManySucceededPods ,  Too many succeeded pods running after completion count reached)
 if complete {job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete,  , ))
 now := metav1.Now()
 job.Status.CompletionTime =  now
 forget := false
 // no need to update the job if the status hasn t changed since last time
 if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
 job.Status.Active = active
 job.Status.Succeeded = succeeded
 job.Status.Failed = failed
 if err := jm.updateHandler(job); err != nil {
 return false, err
 if jobHaveNewFailure   !IsJobFinished(job) {
 // returning an error will re-enqueue Job after the backoff period
 return false, fmt.Errorf(failed pod(s) detected for job key %q , key)
 forget = true
 return forget, manageJobErr
}

从 Indexer 中查找指定的 Job 是否存在,如果不存在,则从 expectations 中删除该 job,流程结束返回 true。否则继续下面流程。

根据 JobCondition Complete or Failed 判断 Job 是否 Finished,如果 Finished,则流程结束返回 true,否则继续下面流程。

调用 SatisfiedExpectations,如果 ControlleeExpectations 中待 add 和 del 都 =0,或者 expectations 已经超过 5 分钟没更新过了,则返回 jobNeedsSync=true,表示需要进行一次 manageJob 了。

对于那些第一次启动的 jobs (StartTime==nil), 需要把设置 StartTime,并且如果 ActiveDeadlineSeconds 不为空,则经过 ActiveDeadlineSeconds 后再次把该 job 加入到 queue 中进行 sync。

获取该 job 管理的所有 pods,过滤出 activePods,计算出 actived,successed,failed pods 的数量。如果 failed job.Status.Failed,说明该 job 又有新 failed Pods 了,则 jobHaveNewFailure 为 true。

如果 jobHaveNewFailure,并且 queue 记录的该 job retry 次数加 1,比 job.Spec.BackoffLimit(默认为 6),则表示该 job BackoffLimitExceeded,jobFailed。如果 job StartTime 到现在为止的历时 =ActiveDeadlineSeconds,则表示该 job DeadlineExceeded,jobFailed。

如果 jobFailed,则用 sync.WaitGroup 并发等待删除所有的前面过滤出来的 activePods,删除成功,则 failed += acitve, active = 0, 并设置 Condition Failed 为 true。

如果 job not failed, jobNeedSync 为 true,并且 job 的 DeletionTimestamp 为空(没有标记为删除),则调用 manageJob 对 Job 管理的 pods 根据复杂的策略进行 add or del。

如果 job not failed 且 job.Spec.Completions 为 nil,表示 This type of job is complete when any pod exits with success。因此如果 succeeded 0 active == 0,则表示 job completed。

如果如果 job not failed 且 job.Spec.Completions 不为 nil,表示 This type of job signals success by having that number of successes。因此如果 succeeded = job.Spec.Completions,则表示 job completed。

如果 job completed,则更新其 Conditions Complete 为 true,并设置 CompletionTime。

接下来 invoke updateJobStatus 更新 etcd 中 job 状态,如果更新失败,则返回 false,该 job 将再次加入 queue。如果 jobHaveNewFailure 为 true,并且 Job Condition 显示该 Job not Finished,则返回 false,该 job 将再次加入 queue。

manageJob

// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Does NOT modify  activePods .
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
 var activeLock sync.Mutex
 active := int32(len(activePods))
 parallelism := *job.Spec.Parallelism
 jobKey, err := controller.KeyFunc(job)
 if err != nil {utilruntime.HandleError(fmt.Errorf( Couldn t get key for job %#v: %v , job, err))
 return 0, nil
 var errCh chan error
 if active   parallelism {
 diff := active - parallelism
 errCh = make(chan error, diff)
 jm.expectations.ExpectDeletions(jobKey, int(diff))
 glog.V(4).Infof(Too many pods running job %q, need %d, deleting %d , jobKey, parallelism, diff)
 // Sort the pods in the order such that not-ready   ready, unscheduled
 //   scheduled, and pending   running. This ensures that we delete pods
 // in the earlier stages whenever possible.
 sort.Sort(controller.ActivePods(activePods))
 active -= diff
 wait := sync.WaitGroup{}
 wait.Add(int(diff))
 for i := int32(0); i   diff; i++ {go func(ix int32) {defer wait.Done()
 if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {defer utilruntime.HandleError(err)
 // Decrement the expected number of deletes because the informer won t observe this deletion
 glog.V(2).Infof(Failed to delete %v, decrementing expectations for job %q/%q , activePods[ix].Name, job.Namespace, job.Name)
 jm.expectations.DeletionObserved(jobKey)
 activeLock.Lock()
 active++
 activeLock.Unlock()
 errCh  - err
 }(i)
 wait.Wait()} else if active   parallelism {wantActive := int32(0)
 if job.Spec.Completions == nil {
 // Job does not specify a number of completions. Therefore, number active
 // should be equal to parallelism, unless the job has seen at least
 // once success, in which leave whatever is running, running.
 if succeeded   0 {wantActive = active} else {wantActive = parallelism} else {
 // Job specifies a specific number of completions. Therefore, number
 // active should not ever exceed number of remaining completions.
 wantActive = *job.Spec.Completions - succeeded
 if wantActive   parallelism {
 wantActive = parallelism
 diff := wantActive - active
 if diff   0 {utilruntime.HandleError(fmt.Errorf( More active than wanted: job %q, want %d, have %d , jobKey, wantActive, active))
 diff = 0
 jm.expectations.ExpectCreations(jobKey, int(diff))
 errCh = make(chan error, diff)
 glog.V(4).Infof(Too few pods running job %q, need %d, creating %d , jobKey, wantActive, diff)
 active += diff
 wait := sync.WaitGroup{}
 // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
 // and double with each successful iteration in a kind of  slow start .
 // This handles attempts to start large numbers of pods that would
 // likely all fail with the same error. For example a project with a
 // low quota that attempts to create a large number of pods will be
 // prevented from spamming the API service with the pod create requests
 // after one of its pods fails. Conveniently, this also prevents the
 // event spam that those failures would generate.
 for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff   0; batchSize = integer.Int32Min(2*batchSize, diff) {errorCount := len(errCh)
 wait.Add(int(batchSize))
 for i := int32(0); i   batchSize; i++ {go func() {defer wait.Done()
 err := jm.podControl.CreatePodsWithControllerRef(job.Namespace,  job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
 if err != nil   errors.IsTimeout(err) {
 // Pod is created but its initialization has timed out.
 // If the initialization is successful eventually, the
 // controller will observe the creation via the informer.
 // If the initialization fails, or if the pod keeps
 // uninitialized for a long time, the informer will not
 // receive any update, and the controller will create a new
 // pod when the expectation expires.
 return
 if err != nil {defer utilruntime.HandleError(err)
 // Decrement the expected number of creates because the informer won t observe this pod
 glog.V(2).Infof(Failed creation, decrementing expectations for job %q/%q , job.Namespace, job.Name)
 jm.expectations.CreationObserved(jobKey)
 activeLock.Lock()
 active--
 activeLock.Unlock()
 errCh  - err
 wait.Wait()
 // any skipped pods that we never attempted to start shouldn t be expected.
 skippedPods := diff - batchSize
 if errorCount   len(errCh)   skippedPods   0 {glog.V(2).Infof(Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q , skippedPods, job.Namespace, job.Name)
 active -= skippedPods
 for i := int32(0); i   skippedPods; i++ {
 // Decrement the expected number of creates because the informer won t observe this pod
 jm.expectations.CreationObserved(jobKey)
 // The skipped pods will be retried later. The next controller resync will
 // retry the slow start process.
 break
 diff -= batchSize
 select {
 case err :=  -errCh:
 // all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
 if err != nil {
 return active, err
 default:
 return active, nil
}

如果 active job.Spec.Parallelism, 表示要 scale down:

计算 active 与 parallelism 的差值 diff,修改 ControllerExpectations 中该 job 的 dels 为 diff,表示要删除 diff 这么多的 pod。

计算 active 与 parallelism 的差值 diff,修改 ControllerExpectations 中该 job 的 dels 为 diff,表示要删除 diff 这么多的 pod。

将 activePods 中的 Pods 按照 not-ready ready, unscheduled scheduled, pending running 进行排序,确保先删除 stage 越早的 pods。

更新 active (active 减去 diff),用 sync.WaitGroup 并发等待删除 etcd 中那些 Pods。如果删除某个 Pod 失败,active 要加 1,expectations 中 dels 要减 1.

返回 active

如果 active job.Spec.Parallelism,表示要 scale up:

如果 job.Spec.Completions 为 nil,且 succeeded 大于 0,则 diff 设为 0;如果 job.Spec.Completions 为 nil,但 successed = 0,则 diff 为 parallelism-active;如果 job.Spec.Completions 不为 nil,则 diff 为 max(job.Spec.Completions – succeeded,parallelim) – active;

修改 ControllerExpectations 中该 job 的 adds 为 diff,表示要新增 diff 这么多的 pod。

更新 active (active 加上 diff),用 sync.WaitGroup 分批的创建 Pods,第一批创建 1 个 ( 代码写死 SlowStartInitialBatchSize = 1),第二批创建 2,然后 4,8,16… 这样下去,但是每次不能超过 diff 的值。每一批创建 pod 后,注意更新 diff 的值(减去 batchsize)。如果某一批创建过程 Pods 中存在失败情况,则更新 active 和 expectations 中 adds,且不进行后续未启动的批量创建 pods 行为。

如果 active == job.Spec.Parallelism,返回 active。

感谢各位的阅读,以上就是“Kubernetes Job Controller 怎么构造”的内容了,经过本文的学习后,相信大家对 Kubernetes Job Controller 怎么构造这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计16744字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)