如何解析k8s中的Informer机制

86次阅读
没有评论

共计 6163 个字符,预计需要花费 16 分钟才能阅读完成。

如何解析 k8s 中的 Informer 机制,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

Informer 机制架构设计总览

下面是我根据理解画的一个数据流转图, 从全局视角看一下数据的整体走向是怎么样的。

其中虚线的表示的是代码中的方法。

首先讲一个结论:

通过 Informer 机制获取数据的情况下,在初始化的时候会从 Kubernetes API Server 获取对应 Resource 的全部 Object,后续只会通过 Watch 机制接收 API Server 推送过来的数据,不会再主动从 API Server 拉取数据,直接使用本地缓存中的数据以减少 API Server 的压力。

Watch 机制基于 HTTP 的 Chunk 实现,维护一个长连接,这是一个优化点,减少请求的数据量。第二个优化点是 SharedInformer, 它可以让同一种资源使用的是同一个 Informer,例如 v1 版本的 Deployment 和 v1beta1 版本的 Deployment 同时存在的时候,共享一个 Informer。

上面图中可以看到 Informer 分为三个部分,可以理解为三大逻辑。

其中 Reflector 主要是把从 API Server 数据获取到的数据放到 DeltaFIFO 队列中,充当生产者角色。

SharedInformer 主要是从 DeltaFIFIO 队列中获取数据并分发数据,充当消费者角色。

最后 Indexer 是作为本地缓存的存储组件存在。

Reflector 理解

Reflector 中主要看 Run、ListAndWatch、watchHandler 三个地方就足够了。

源码位置是 tools/cache/reflector.go

// Ruvn starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
// 开始时执行 Run,上一层调用的地方是  controller.go 中的 Run 方法
func (r *Reflector) Run(stopCh  -chan struct{}) {
 
 klog.V(3).Infof(Starting reflector %v (%s) from %s , r.expectedTypeName, r.resyncPeriod, r.name)
 wait.Until(func() {
 // 启动后执行一次 ListAndWatch
 if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err)
 }
 }, r.period, stopCh)
// and then use the resource version to watch.
// It returns error if ListAndWatch didn t even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh  -chan struct{}) error {
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
 // list request will return the full response.
 pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
// 这里是调用了各个资源中的 ListFunc 函数, 例如如果 v1 版本的 Deployment
// 则调用的是 informers/apps/v1/deployment.go 中的 ListFunc
 return r.listerWatcher.List(opts)
 }))
 if r.WatchListPageSize != 0 {
 pager.Pa1geSize = r.WatchListPageSize
 }
 // Pager falls back to full list if paginated list calls fail due to an  Expired

数据的生产就结束了,就两点:

初始化时从 API Server 请求数据

监听后续从 Watch 推送来的数据

DeltaFIFO 理解

先看一下数据结构:

type DeltaFIFO struct { items map[string]Deltas
 queue []string
type Delta struct {
 Type DeltaType
 Object interface{}
type Deltas []Delta

 Added DeltaType =  Added  Updated DeltaType =  Updated  Deleted DeltaType =  Deleted  Sync DeltaType =  Sync )

其中 queue 存储的是 Object 的 id, 而 items 存储的是以 ObjectID 为 key 的这个 Object 的事件列表,

可以想象到是这样的一个数据结构, 左边是 Key, 右边是一个数组对象, 其中每个元素都是由 type 和 obj 组成.

DeltaFIFO 顾名思义存放 Delta 数据的先入先出队列,相当于一个数据的中转站,将数据从一个地方转移另一个地方。

主要看的内容是 queueActionLocked、Pop、Resync

queueActionLocked 方法:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { newDeltas := append(f.items[id], Delta{actionType, obj})
 // 去重处理
 newDeltas = dedupDeltas(newDeltas)
 if len(newDeltas)   0 {
 ... 
 //pop 消息
 
 f.cond.Broadcast()
 ...
 return nil
}

Pop 方法:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock()
 defer f.lock.Unlock()
 for { for len(f.queue) == 0 { // 阻塞   直到调用了 f.cond.Broadcast()
 f.cond.Wait()
 }
// 取出第一个元素
 id := f.queue[0]
 f.queue = f.queue[1:]
 ...
 item, ok := f.items[id]
 delete(f.items, id)
 // 这个 process 可以在 controller.go 中的 processLoop() 找到
 // 初始化是在 shared_informer.go 的 Run
 // 最终执行到 shared_informer.go 的 HandleDeltas 方法
 err := process(item)
 // 如果处理出错了重新放回队列中
 if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item)
 err = e.Err
 }
 ...
 }
}

Resync 机制:

小总结:每次从本地缓存 Indexer 中获取数据重新放到 DeltaFIFO 中执行任务逻辑。

启动的 Resync 地方是 reflector.go 的 resyncChan() 方法,在 reflector.go 的 ListAndWatch 方法中的调用开始定时执行。

go func() {
 // 启动定时任务
 resyncCh, cleanup := r.resyncChan()
 defer func() { cleanup() // Call the last one written into cleanup
 }()
 for {
 select {
 case  -resyncCh:
 case  -stopCh:
 return
 case  -cancelCh:
 return
 }
 // 定时执行   调用会执行到 delta_fifo.go 的 Resync() 方法
 if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof(%s: forcing resync , r.name)
 if err := r.store.Resync(); err != nil {
 resyncerrc  - err
 return
 }
 }
 cleanup()
 resyncCh, cleanup = r.resyncChan()
 }
 }()
func (f *DeltaFIFO) Resync() error {
 ...
// 从缓存中获取到所有的 key
 keys := f.knownObjects.ListKeys()
 for _, k := range keys { if err := f.syncKeyLocked(k); err != nil {
 return err
 }
 }
 return nil

func (f *DeltaFIFO) syncKeyLocked(key string) error {  // 获缓存拿到对应的 Object  obj, exists, err := f.knownObjects.GetByKey(key)  ...  // 放入到队列中执行任务逻辑  if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf( couldn t queue object: %v , err)  }  return nil }

SharedInformer 消费消息理解

主要看 HandleDeltas 方法就好,消费消息然后分发数据并且存储数据到缓存的地方

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock()
 defer s.blockDeltas.Unlock()
 // from oldest to newest
 for _, d := range obj.(Deltas) {
 
 switch d.Type {
 case Sync, Added, Updated:
 ...
 // 查一下是否在 Indexer 缓存中   如果在缓存中就更新缓存中的对象
 if old, exists, err := s.indexer.Get(d.Object); err == nil   exists { if err := s.indexer.Update(d.Object); err != nil {
 return err
 }
 // 把数据分发到 Listener
 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
 } else {
 // 没有在 Indexer 缓存中   把对象插入到缓存中
 if err := s.indexer.Add(d.Object); err != nil {
 return err
 }
 s.processor.distribute(addNotification{newObj: d.Object}, isSync)
 }
 ...
 }
 }
 return nil
}

Indexer 理解

这块不会讲述太多内容,因为我认为 Informer 机制最主要的还是前面数据的流转,当然这并不代表数据存储不重要,而是先理清楚整体的思路,后续再详细更新存储的部分。

Indexer 使用的是 threadsafe_store.go 中的 threadSafeMap 存储数据,是一个线程安全并且带有索引功能的 map, 数据只会存放在内存中,每次涉及操作都会进行加锁。

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
 lock sync.RWMutex
 items map[string]interface{}
 indexers Indexers
 indices Indices
}

Indexer 还有一个索引相关的内容就暂时不展开讲述。

Example 代码

-------------
package main
import (
  flag 
  fmt 
  path/filepath 
  time 
 v1  k8s.io/api/apps/v1 
  k8s.io/apimachinery/pkg/labels 
  k8s.io/client-go/informers 
  k8s.io/client-go/kubernetes 
  k8s.io/client-go/rest 
  k8s.io/client-go/tools/cache 
  k8s.io/client-go/tools/clientcmd 
  k8s.io/client-go/util/homedir 
func main() {
 var err error
 var config *rest.Config
 var kubeconfig *string
 if home := homedir.HomeDir(); home != 

以上示例代码中程序启动后会拉取一次 Deployment 数据,并且拉取数据完成后从本地缓存中 List 一次 default 命名空间的 Deployment 资源并打印,然后每 60 秒 Resync 一次 Deployment 资源。

QA

为什么需要 Resync?

在本周有同学提出一个,我看到这个问题后也感觉挺奇怪的,因为 Resync 是从本地缓存的数据缓存到本地缓存 (从开始到结束来说是这样), 为什么需要把数据拿出来又走一遍流程呢?当时钻牛角尖也是想不明白,后来换个角度想就知道了。

数据从 API Server 过来并且经过处理后放到缓存中,但数据并不一定就可以正常处理,也就是说可能报错了,而这个 Resync 相当于一个重试的机制。

可以尝试实践一下: 部署有状态服务,存储使用 LocalPV(也可以换成自己熟悉的), 这时候 pod 会由于存储目录不存在而启动失败. 然后在 pod 启动失败后再创建好对应的目录,过一会 pod 就启动成功了。

看完上述内容,你们掌握如何解析 k8s 中的 Informer 机制的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注丸趣 TV 行业资讯频道,感谢各位的阅读!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计6163字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)