共计 15660 个字符,预计需要花费 40 分钟才能阅读完成。
如何进行 main 方法与 Leader 选举分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
main 方法与 Leader 选举分析 1.main 方法分析
主要对 main 方法的主要逻辑进行分析,以及分析下组件的 EventHandler,看该组件 list/watch 哪些对象,对象事件来了怎么处理,以及 claimQueue 与 volumeQueue 的对象来源。
main 方法主要逻辑分析
main 方法主要逻辑:
(1)解析启动参数;
(2)根据配置建立 clientset;
(3)建立 grpcclient;
(4)进行 grpc 探测(探测 cephcsi-rbd 服务是否准备好),直至探测成功;
(5)通过 grpc 获取 driver 名称与能力;
(6)根据 clientset 建立 informers;
(7)构建 provisionController 对象;
(8)定义 run 方法(包括了 provisionController.Run);
(9)根据 –enable-leader-election 组件启动参数配置决定是否开启 Leader 选举,当不开启时,直接运行 run 方法,开启时调用 le.Run()。
func main() {
var config *rest.Config
var err error
flag.Var(utilflag.NewMapStringBool( featureGates), feature-gates , A set of key=value pairs that describe feature gates for alpha/experimental features. +
Options are:\n +strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), \n ))
klog.InitFlags(nil)
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
flag.Set(logtostderr , true)
flag.Parse()
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {klog.Fatal(err)
if *showVersion {fmt.Println(os.Args[0], version)
os.Exit(0)
klog.Infof(Version: %s , version)
// get the KUBECONFIG from env if specified (useful for local/debug cluster)
kubeconfigEnv := os.Getenv(KUBECONFIG)
if kubeconfigEnv != {klog.Infof( Found KUBECONFIG environment variable set, using that..)
kubeconfig = kubeconfigEnv
if *master != || *kubeconfig != {klog.Infof( Either master or kubeconfig specified. building kube config from that..)
config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
} else {klog.Infof( Building kube configs for running in cluster...)
config, err = rest.InClusterConfig()
if err != nil {klog.Fatalf( Failed to create config: %v , err)
clientset, err := kubernetes.NewForConfig(config)
if err != nil {klog.Fatalf( Failed to create client: %v , err)
// snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1beta1Client
snapClient, err := snapclientset.NewForConfig(config)
if err != nil {klog.Fatalf( Failed to create snapshot client: %v , err)
// The controller needs to know what the server version is because out-of-tree
// provisioners aren t officially supported until 1.5
serverVersion, err := clientset.Discovery().ServerVersion()
if err != nil {klog.Fatalf( Error getting server version: %v , err)
metricsManager := metrics.NewCSIMetricsManager( /* driverName */)
grpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager)
if err != nil {klog.Error(err.Error())
os.Exit(1)
}
// 循环探测,直至 CSI driver 即 cephcsi-rbd 服务准备好
err = ctrl.Probe(grpcClient, *operationTimeout)
if err != nil {klog.Error(err.Error())
os.Exit(1)
// 从 ceph-csi 组件中获取 driver name
provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)
if err != nil {klog.Fatalf( Error getting CSI driver name: %s , err)
klog.V(2).Infof(Detected CSI driver %s , provisionerName)
metricsManager.SetDriverName(provisionerName)
metricsManager.StartMetricsEndpoint(*metricsAddress, *metricsPath)
// 从 ceph-csi 组件中获取 driver 能力
pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
if err != nil {klog.Fatalf( Error getting CSI driver capabilities: %s , err)
// Generate a unique ID for this provisioner
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + - + strconv.Itoa(rand.Intn(10000)) + - + provisionerName
// 开始构建 infomer
factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
// -------------------------------
// Listers
// Create informer to prevent hit the API server for all resource request
scLister := factory.Storage().V1().StorageClasses().Lister()
claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()
var csiNodeLister storagelistersv1beta1.CSINodeLister
var nodeLister v1.NodeLister
if ctrl.SupportsTopology(pluginCapabilities) {csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
nodeLister = factory.Core().V1().Nodes().Lister()
// -------------------------------
// PersistentVolumeClaims informer
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, claims)
claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
// Setup options
provisionerOptions := []func(*controller.ProvisionController) error{controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.
controller.FailedProvisionThreshold(0),
controller.FailedDeleteThreshold(0),
controller.RateLimiter(rateLimiter),
controller.Threadiness(int(*workerThreads)),
controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),
controller.ClaimsInformer(claimInformer),
translator := csitrans.New()
supportsMigrationFromInTreePluginName :=
if translator.IsMigratedCSIDriverByName(provisionerName) {supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName)
if err != nil {klog.Fatalf( Failed to get InTree plugin name for migrated CSI plugin %s: %v , provisionerName, err)
klog.V(2).Infof(Supports migration from in-tree plugin: %s , supportsMigrationFromInTreePluginName)
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(
clientset,
*operationTimeout,
identity,
*volumeNamePrefix,
*volumeNameUUIDLength,
grpcClient,
snapClient,
provisionerName,
pluginCapabilities,
controllerCapabilities,
supportsMigrationFromInTreePluginName,
*strictTopology,
translator,
scLister,
csiNodeLister,
nodeLister,
claimLister,
*extraCreateMetadata,
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
csiProvisioner,
serverVersion.GitVersion,
provisionerOptions...,
csiClaimController := ctrl.NewCloningProtectionController(
clientset,
claimLister,
claimInformer,
claimQueue,
// 主循环函数
run := func(context.Context) {stopCh := context.Background().Done()
factory.Start(stopCh)
cacheSyncResult := factory.WaitForCacheSync(stopCh)
for _, v := range cacheSyncResult {
if !v {klog.Fatalf( Failed to sync Informers!)
// 跑两个 controller,后面主要分析 provisionController
go csiClaimController.Run(int(*finalizerThreads), stopCh)
provisionController.Run(wait.NeverStop)
// Leader 选举相关
if !*enableLeaderElection {run(context.TODO())
} else {
// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/v5/controller
// to preserve backwards compatibility
lockName := strings.Replace(provisionerName, / , - , -1)
// 使用 endpoints 或 leases 资源对象来选 leader
var le leaderElection
if *leaderElectionType == endpoints {klog.Warning( The endpoints leader election type is deprecated and will be removed in a future release. Use --leader-election-type=leases instead.)
le = leaderelection.NewLeaderElectionWithEndpoints(clientset, lockName, run)
} else if *leaderElectionType == leases {le = leaderelection.NewLeaderElection(clientset, lockName, run)
} else {klog.Error( --leader-election-type must be either endpoints or leases)
os.Exit(1)
if *leaderElectionNamespace != {le.WithNamespace(*leaderElectionNamespace)
// 处理 Leader 选举逻辑的方法
if err := le.Run(); err != nil {klog.Fatalf( failed to initialize leader election: %v , err)
}
controller.NewProvisionController
主要看到 EventHandler,定义了该组件 list/watch 的对象,对象事件来了怎么处理,以及 claimQueue 与 volumeQueue 的对象来源。
claimHandler
可以看到,claimQueue 的来源是 pvc 对象的新增、更新事件(对 claimQueue 的处理已在 external-provisioner 源码分析(1)- 主体处理逻辑分析中讲过,忘了的话可以回头看下)。
...
// PersistentVolumeClaims
claimHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) { controller.enqueueClaim(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },
DeleteFunc: func(obj interface{}) {
// NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual
// or it s not in claimsInProgress and then we don t care
if controller.claimInformer != nil {controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
} else {controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()
controller.claimInformer.AddEventHandler(claimHandler)
...
// enqueueClaim takes an obj and converts it into UID that is then put onto claim work queue.
func (ctrl *ProvisionController) enqueueClaim(obj interface{}) {uid, err := getObjectUID(obj)
if err != nil {utilruntime.HandleError(err)
return
if ctrl.claimQueue.NumRequeues(uid) == 0 {ctrl.claimQueue.Add(uid)
}
volumeHandler
可以看到,volumeQueue 的来源是 pv 对象的新增、更新事件(对 volumeQueue 的处理已在 external-provisioner 源码分析(1)- 主体处理逻辑分析中讲过,忘了的话可以回头看下)。
...
// PersistentVolumes
volumeHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) { controller.enqueueVolume(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) },
if controller.volumeInformer != nil {controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
} else {controller.volumeInformer = informer.Core().V1().PersistentVolumes().Informer()
controller.volumeInformer.AddEventHandler(volumeHandler)
...
// enqueueVolume takes an obj and converts it into a namespace/name string which
// is then put onto the given work queue.
func (ctrl *ProvisionController) enqueueVolume(obj interface{}) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {utilruntime.HandleError(err)
return
// Re-Adding is harmless but try to add it to the queue only if it is not
// already there, because if it is already there we *must* be retrying it
if ctrl.volumeQueue.NumRequeues(key) == 0 {ctrl.volumeQueue.Add(key)
}
// forgetVolume Forgets an obj from the given work queue, telling the queue to
// stop tracking its retries because e.g. the obj was deleted
func (ctrl *ProvisionController) forgetVolume(obj interface{}) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {utilruntime.HandleError(err)
return
ctrl.volumeQueue.Forget(key)
ctrl.volumeQueue.Done(key)
}
2.Leader 选举分析
在 Golang 中,k8s client-go 这个 package 针对 Leader 相关功能进行了封装,支持 3 种锁资源,endpoint,configmap,lease,方便使用。
Leader 选举基本原理
Leader 选举基本原理其实就是利用通过 Kubernetes 中 configmap,endpoints 或者 lease 资源实现一个分布式锁,抢 (acqure) 到锁的节点成为 leader,并且定期更新(renew)。其他进程也在不断的尝试进行抢占,抢占不到则继续等待下次循环。当 leader 节点挂掉之后,租约到期,其他节点就成为新的 leader。
抢到锁其实就是成功把该进程的相关信息(如进程唯一标识)写入 configmap、endpoints 或者 lease 资源对象中;而定期更新其实就是定期更新该资源的锁更新时间,以延续租期。
多个进程同时获取锁(更新锁资源)时,由 apiserver 来保证锁资源 update 的原子操作,通过对比 resourceVersion 版本号(resourceVersion 的取值最终来源于 etcd 的 modifiedindex),保证只有一个进程能修改成功,也即只有一个进程能成功获取到锁。
锁示例如下:
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
creationTimestamp: 2020-08-21T11:56:46Z
name: rbd-csi-ceph-com
namespace: default
resourceVersion: 69642798
selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/rbd-csi-ceph-com
uid: c9a7ea00-c000-4c5c-b90f-d0e7c85240ca
spec:
acquireTime: 2020-08-21T11:56:46.907075Z
holderIdentity: cld-dnode3-1091-i-nease-net
leaseDurationSeconds: 15
leaseTransitions: 0
renewTime: 2020-09-07T02:38:24.587170Z
其中 holderIdentity 记录了获取到锁的进程信息,renewTime 记录了锁更新时间。
external-provisioner 的 Leader 选举
从 main 方法代码中可以看出,在 external-provisioner 组件中,仅支持 endpoint 与 lease 两种锁资源,且 endpoints 锁会在后续被弃用,所以建议使用 lease 锁。
external-provisioner 组件的高可用选主逻辑与 k8s 中的 kube-controller-manager、kube-scheduler 等组件的高可用选主逻辑类似。
概要过程:
(1)组件启动时,定期循环的去获取 lease 锁,获取成功则成为 leader 且返回,否则一直阻塞;
(2)获取 lease 锁成功,则竞选 leader 成功,然后运行 external-provisioner 组件的主体处理逻辑;
(3)竞选 leader 成功后,继续定期循环续约,以保证 leader 的长久性。
下面进行代码的分析。
le.Run()
当 –enable-leader-election 组件启动参数为 true 时,运行该方法,主要逻辑为:
(1)定义 leaderConfig 结构体;
(2)调用 leaderelection.RunOrDie 做进一步的选主逻辑处理。
func (l *leaderElection) Run() error {
if l.identity == {id, err := defaultLeaderElectionIdentity()
if err != nil {return fmt.Errorf( error getting the default leader identity: %v , err)
l.identity = id
if l.namespace == {l.namespace = inClusterNamespace()
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf( %s/%s , l.lockName, string(l.identity))})
rlConfig := resourcelock.ResourceLockConfig{Identity: sanitizeName(l.identity),
EventRecorder: eventRecorder,
lock, err := resourcelock.New(l.resourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), l.clientset.CoordinationV1(), rlConfig)
if err != nil {
return err
leaderConfig := leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: l.leaseDuration,
RenewDeadline: l.renewDeadline,
RetryPeriod: l.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{OnStartedLeading: func(ctx context.Context) {klog.V(2).Info(became leader, starting)
l.runFunc(ctx)
OnStoppedLeading: func() {klog.Fatal( stopped leading)
OnNewLeader: func(identity string) {klog.V(3).Infof(new leader detected, current leader: %s , identity)
leaderelection.RunOrDie(context.TODO(), leaderConfig)
return nil // should never reach here
}
leaderelection.RunOrDie()
主要逻辑:
(1)调用 le.acquire()方法来尝试竞选为 leader(acquire 方法会定期循环的去获取 lease 锁,获取成功则成为 leader 且返回,否则一直阻塞);
(2)竞选 leader 成功,运行 run 方法;
(3)调用 le.renew()续约方法,定期循环续约。
// RunOrDie starts a client with the provided config or panics if the config
// fails to validate.
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {le, err := NewLeaderElector(lec)
if err != nil {panic(err)
if lec.WatchDog != nil {lec.WatchDog.SetLeaderElection(le)
le.Run(ctx)
// Run starts the leader election loop
func (le *LeaderElector) Run(ctx context.Context) {defer func() {runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading()
if !le.acquire(ctx) {
return // ctx signalled done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(ctx)
// acquire 会不断循环的去获取 lease 锁,获取成功则成为 leader 且返回
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof(attempting to acquire leader lease %v... , desc)
wait.JitterUntil(func() {succeeded = le.tryAcquireOrRenew()
le.maybeReportTransition()
if !succeeded {klog.V(4).Infof(failed to acquire lease %v , desc)
return
le.config.Lock.RecordEvent(became leader)
le.metrics.leaderOn(le.config.Name)
klog.Infof(successfully acquired lease %v , desc)
cancel()}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
// 续约方法,不断循环续约
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) {ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.Until(func() {timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {done := make(chan bool, 1)
go func() {defer close(done)
done - le.tryAcquireOrRenew()
select {case -timeoutCtx.Done():
return false, fmt.Errorf(failed to tryAcquireOrRenew %s , timeoutCtx.Err())
case result := -done:
return result, nil
}, timeoutCtx.Done())
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {klog.V(5).Infof(successfully renewed lease %v , desc)
return
le.config.Lock.RecordEvent(stopped leading)
le.metrics.leaderOff(le.config.Name)
klog.Infof(failed to renew lease %v: %v , desc, err)
cancel()}, le.config.RetryPeriod, ctx.Done())
// if we hold the lease, give it up
if le.config.ReleaseOnCancel {le.release()
}
看完上述内容,你们掌握如何进行 main 方法与 Leader 选举分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注丸趣 TV 行业资讯频道,感谢各位的阅读!