共计 16744 个字符,预计需要花费 42 分钟才能阅读完成。
这篇文章主要讲解了“Kubernetes Job Controller 怎么构造”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“Kubernetes Job Controller 怎么构造”吧!
实现流程图
废话不多说,先把完整流程贴出来。
New JobController
type JobController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
// To allow injection of updateJobStatus for testing.
updateHandler func(job *batch.Job) error
syncHandler func(jobKey string) (bool, error)
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced
// jobStoreSynced returns true if the job store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
jobStoreSynced cache.InformerSynced
// A TTLCache of pod creates/deletes each rc expects to see
expectations controller.ControllerExpectationsInterface
// A store of jobs
jobLister batchv1listers.JobLister
// A store of pods, populated by the podController
podStore corelisters.PodLister
// Jobs that need to be updated
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events()})
if kubeClient != nil kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {metrics.RegisterMetricAndTrackRateLimiterUsage( job_controller , kubeClient.CoreV1().RESTClient().GetRateLimiter())
jm := JobController{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: job-controller}),
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), job ),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: job-controller}),
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
UpdateFunc: jm.updateJob,
DeleteFunc: jm.enqueueController,
jm.jobLister = jobInformer.Lister()
jm.jobStoreSynced = jobInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.addPod,
UpdateFunc: jm.updatePod,
DeleteFunc: jm.deletePod,
jm.podStore = podInformer.Lister()
jm.podStoreSynced = podInformer.Informer().HasSynced
jm.updateHandler = jm.updateJobStatus
jm.syncHandler = jm.syncJob
return jm
}
构造 JobController,并初始化相关数据,比如 rate limiter queue;
watch pod and job object;
注册 podInformer 的 add/del/update EventHandler;
注册 jobInformer 的 add/del/update EventHandler;
注册 updataHandler 为 updateJobStatus,用来更新 Job 状态;
注册 syncHandler 为 syncJob,用来进行处理 queue 中的 Job;
JobController Run
// Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh -chan struct{}) {defer utilruntime.HandleCrash()
defer jm.queue.ShutDown()
glog.Infof(Starting job controller)
defer glog.Infof(Shutting down job controller)
if !controller.WaitForCacheSync(job , stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
return
for i := 0; i workers; i++ {go wait.Until(jm.worker, time.Second, stopCh)
-stopCh
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *JobController) worker() {for jm.processNextWorkItem() {func (jm *JobController) processNextWorkItem() bool {key, quit := jm.queue.Get()
if quit {
return false
defer jm.queue.Done(key)
forget, err := jm.syncHandler(key.(string))
if err == nil {
if forget {jm.queue.Forget(key)
return true
utilruntime.HandleError(fmt.Errorf( Error syncing job: %v , err))
jm.queue.AddRateLimited(key)
return true
}
WaitForCacheSync 等待 jobController cache 同步;
启动 5 个 goruntine,每个协程分别执行 worker,每个 worker 执行完后等待 1s,继续执行,如此循环;
worker 负责从从 queue 中 get job key,对每个 job,调用 syncJob 进行同步,如果 syncJob 成功,则 forget the job(其实就是让 rate limiter 停止 tracking it),否则将该 key 再次加入到 queue 中,等待下次 sync。
syncJob
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *JobController) syncJob(key string) (bool, error) {startTime := time.Now()
defer func() {glog.V(4).Infof(Finished syncing job %q (%v) , key, time.Now().Sub(startTime))
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false, err
if len(ns) == 0 || len(name) == 0 {return false, fmt.Errorf( invalid job key %q: either namespace or name is missing , key)
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {if errors.IsNotFound(err) {glog.V(4).Infof(Job has been deleted: %v , key)
jm.expectations.DeleteExpectations(key)
return true, nil
return false, err
job := *sharedJob
// if job was finished previously, we don t want to redo the termination
if IsJobFinished(job) {
return true, nil
// retrieve the previous number of retry
previousRetry := jm.queue.NumRequeues(key)
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we ve retrieved active pods from the store. If a new pod enters
// the store after we ve checked the expectation, the job sync is just deferred till the next relist.
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
pods, err := jm.getPodsForJob(job)
if err != nil {
return false, err
activePods := controller.FilterActivePods(pods)
active := int32(len(activePods))
succeeded, failed := getStatus(pods)
conditions := len(job.Status.Conditions)
// job first start
if job.Status.StartTime == nil {now := metav1.Now()
job.Status.StartTime = now
// enqueue a sync to check if job past ActiveDeadlineSeconds
if job.Spec.ActiveDeadlineSeconds != nil {glog.V(4).Infof( Job %s have ActiveDeadlineSeconds will sync after %d seconds ,
key, *job.Spec.ActiveDeadlineSeconds)
jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
var manageJobErr error
jobFailed := false
var failureReason string
var failureMessage string
jobHaveNewFailure := failed job.Status.Failed
// check if the number of failed jobs increased since the last syncJob
if jobHaveNewFailure (int32(previousRetry)+1 *job.Spec.BackoffLimit) {
jobFailed = true
failureReason = BackoffLimitExceeded
failureMessage = Job has reach the specified backoff limit
} else if pastActiveDeadline(job) {
jobFailed = true
failureReason = DeadlineExceeded
failureMessage = Job was active longer than specified deadline
if jobFailed {errCh := make(chan error, active)
jm.deleteJobPods(job, activePods, errCh)
select {
case manageJobErr = -errCh:
if manageJobErr != nil {
break
default:
// update status values accordingly
failed += active
active = 0
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
jm.recorder.Event(job, v1.EventTypeWarning, failureReason, failureMessage)
} else {
if jobNeedsSync job.DeletionTimestamp == nil {active, manageJobErr = jm.manageJob(activePods, succeeded, job)
completions := succeeded
complete := false
if job.Spec.Completions == nil {
// This type of job is complete when any pod exits with success.
// Each pod is capable of
// determining whether or not the entire Job is done. Subsequent pods are
// not expected to fail, but if they do, the failure is ignored. Once any
// pod succeeds, the controller waits for remaining pods to finish, and
// then the job is complete.
if succeeded 0 active == 0 {complete = true} else {
// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
if completions = *job.Spec.Completions {
complete = true
if active 0 {jm.recorder.Event( job, v1.EventTypeWarning, TooManyActivePods , Too many active pods running after completion count reached)
if completions *job.Spec.Completions {jm.recorder.Event( job, v1.EventTypeWarning, TooManySucceededPods , Too many succeeded pods running after completion count reached)
if complete {job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, , ))
now := metav1.Now()
job.Status.CompletionTime = now
forget := false
// no need to update the job if the status hasn t changed since last time
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
job.Status.Active = active
job.Status.Succeeded = succeeded
job.Status.Failed = failed
if err := jm.updateHandler(job); err != nil {
return false, err
if jobHaveNewFailure !IsJobFinished(job) {
// returning an error will re-enqueue Job after the backoff period
return false, fmt.Errorf(failed pod(s) detected for job key %q , key)
forget = true
return forget, manageJobErr
}
从 Indexer 中查找指定的 Job 是否存在,如果不存在,则从 expectations 中删除该 job,流程结束返回 true。否则继续下面流程。
根据 JobCondition Complete or Failed 判断 Job 是否 Finished,如果 Finished,则流程结束返回 true,否则继续下面流程。
调用 SatisfiedExpectations,如果 ControlleeExpectations 中待 add 和 del 都 =0,或者 expectations 已经超过 5 分钟没更新过了,则返回 jobNeedsSync=true,表示需要进行一次 manageJob 了。
对于那些第一次启动的 jobs (StartTime==nil), 需要把设置 StartTime,并且如果 ActiveDeadlineSeconds 不为空,则经过 ActiveDeadlineSeconds 后再次把该 job 加入到 queue 中进行 sync。
获取该 job 管理的所有 pods,过滤出 activePods,计算出 actived,successed,failed pods 的数量。如果 failed job.Status.Failed,说明该 job 又有新 failed Pods 了,则 jobHaveNewFailure 为 true。
如果 jobHaveNewFailure,并且 queue 记录的该 job retry 次数加 1,比 job.Spec.BackoffLimit(默认为 6),则表示该 job BackoffLimitExceeded,jobFailed。如果 job StartTime 到现在为止的历时 =ActiveDeadlineSeconds,则表示该 job DeadlineExceeded,jobFailed。
如果 jobFailed,则用 sync.WaitGroup 并发等待删除所有的前面过滤出来的 activePods,删除成功,则 failed += acitve, active = 0, 并设置 Condition Failed 为 true。
如果 job not failed, jobNeedSync 为 true,并且 job 的 DeletionTimestamp 为空(没有标记为删除),则调用 manageJob 对 Job 管理的 pods 根据复杂的策略进行 add or del。
如果 job not failed 且 job.Spec.Completions 为 nil,表示 This type of job is complete when any pod exits with success。因此如果 succeeded 0 active == 0,则表示 job completed。
如果如果 job not failed 且 job.Spec.Completions 不为 nil,表示 This type of job signals success by having that number of successes。因此如果 succeeded = job.Spec.Completions,则表示 job completed。
如果 job completed,则更新其 Conditions Complete 为 true,并设置 CompletionTime。
接下来 invoke updateJobStatus 更新 etcd 中 job 状态,如果更新失败,则返回 false,该 job 将再次加入 queue。如果 jobHaveNewFailure 为 true,并且 Job Condition 显示该 Job not Finished,则返回 false,该 job 将再次加入 queue。
manageJob
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Does NOT modify activePods .
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
var activeLock sync.Mutex
active := int32(len(activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {utilruntime.HandleError(fmt.Errorf( Couldn t get key for job %#v: %v , job, err))
return 0, nil
var errCh chan error
if active parallelism {
diff := active - parallelism
errCh = make(chan error, diff)
jm.expectations.ExpectDeletions(jobKey, int(diff))
glog.V(4).Infof(Too many pods running job %q, need %d, deleting %d , jobKey, parallelism, diff)
// Sort the pods in the order such that not-ready ready, unscheduled
// scheduled, and pending running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(activePods))
active -= diff
wait := sync.WaitGroup{}
wait.Add(int(diff))
for i := int32(0); i diff; i++ {go func(ix int32) {defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {defer utilruntime.HandleError(err)
// Decrement the expected number of deletes because the informer won t observe this deletion
glog.V(2).Infof(Failed to delete %v, decrementing expectations for job %q/%q , activePods[ix].Name, job.Namespace, job.Name)
jm.expectations.DeletionObserved(jobKey)
activeLock.Lock()
active++
activeLock.Unlock()
errCh - err
}(i)
wait.Wait()} else if active parallelism {wantActive := int32(0)
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if succeeded 0 {wantActive = active} else {wantActive = parallelism} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - succeeded
if wantActive parallelism {
wantActive = parallelism
diff := wantActive - active
if diff 0 {utilruntime.HandleError(fmt.Errorf( More active than wanted: job %q, want %d, have %d , jobKey, wantActive, active))
diff = 0
jm.expectations.ExpectCreations(jobKey, int(diff))
errCh = make(chan error, diff)
glog.V(4).Infof(Too few pods running job %q, need %d, creating %d , jobKey, wantActive, diff)
active += diff
wait := sync.WaitGroup{}
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of slow start .
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff 0; batchSize = integer.Int32Min(2*batchSize, diff) {errorCount := len(errCh)
wait.Add(int(batchSize))
for i := int32(0); i batchSize; i++ {go func() {defer wait.Done()
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
if err != nil errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
if err != nil {defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won t observe this pod
glog.V(2).Infof(Failed creation, decrementing expectations for job %q/%q , job.Namespace, job.Name)
jm.expectations.CreationObserved(jobKey)
activeLock.Lock()
active--
activeLock.Unlock()
errCh - err
wait.Wait()
// any skipped pods that we never attempted to start shouldn t be expected.
skippedPods := diff - batchSize
if errorCount len(errCh) skippedPods 0 {glog.V(2).Infof(Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q , skippedPods, job.Namespace, job.Name)
active -= skippedPods
for i := int32(0); i skippedPods; i++ {
// Decrement the expected number of creates because the informer won t observe this pod
jm.expectations.CreationObserved(jobKey)
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
diff -= batchSize
select {
case err := -errCh:
// all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
if err != nil {
return active, err
default:
return active, nil
}
如果 active job.Spec.Parallelism, 表示要 scale down:
计算 active 与 parallelism 的差值 diff,修改 ControllerExpectations 中该 job 的 dels 为 diff,表示要删除 diff 这么多的 pod。
计算 active 与 parallelism 的差值 diff,修改 ControllerExpectations 中该 job 的 dels 为 diff,表示要删除 diff 这么多的 pod。
将 activePods 中的 Pods 按照 not-ready ready, unscheduled scheduled, pending running 进行排序,确保先删除 stage 越早的 pods。
更新 active (active 减去 diff),用 sync.WaitGroup 并发等待删除 etcd 中那些 Pods。如果删除某个 Pod 失败,active 要加 1,expectations 中 dels 要减 1.
返回 active
如果 active job.Spec.Parallelism,表示要 scale up:
如果 job.Spec.Completions 为 nil,且 succeeded 大于 0,则 diff 设为 0;如果 job.Spec.Completions 为 nil,但 successed = 0,则 diff 为 parallelism-active;如果 job.Spec.Completions 不为 nil,则 diff 为 max(job.Spec.Completions – succeeded,parallelim) – active;
修改 ControllerExpectations 中该 job 的 adds 为 diff,表示要新增 diff 这么多的 pod。
更新 active (active 加上 diff),用 sync.WaitGroup 分批的创建 Pods,第一批创建 1 个 ( 代码写死 SlowStartInitialBatchSize = 1),第二批创建 2,然后 4,8,16… 这样下去,但是每次不能超过 diff 的值。每一批创建 pod 后,注意更新 diff 的值(减去 batchsize)。如果某一批创建过程 Pods 中存在失败情况,则更新 active 和 expectations 中 adds,且不进行后续未启动的批量创建 pods 行为。
如果 active == job.Spec.Parallelism,返回 active。
感谢各位的阅读,以上就是“Kubernetes Job Controller 怎么构造”的内容了,经过本文的学习后,相信大家对 Kubernetes Job Controller 怎么构造这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!