apiserver的list

69次阅读
没有评论

共计 10083 个字符,预计需要花费 26 分钟才能阅读完成。

本篇内容主要讲解“apiserver 的 list-watch 怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“apiserver 的 list-watch 怎么使用”吧!

0. list-watch 的需求

上图是一个典型的 Pod 创建过程,在这个过程中,每次当 kubectl 创建了 ReplicaSet 对象后,controller-manager 都是通过 list-watch 这种方式得到了最新的 ReplicaSet 对象,并执行自己的逻辑来创建 Pod 对象。其他的几个组件,Scheduler/Kubelet 也是一样,通过 list-watch 得知变化并进行处理。这是组件的处理端代码:

c.NodeLister.Store, c.nodePopulator = framework.NewInformer( c.createNodeLW(), ...(1)
 api.Node{}, ...(2)
 0, ...(3)
 framework.ResourceEventHandlerFuncs{ ...(4)
 AddFunc: c.addNodeToCache, ...(5)
 UpdateFunc: c.updateNodeInCache,
 DeleteFunc: c.deleteNodeFromCache,
)

其中 (1) 是 list-watch 函数,(4)(5)则是相应事件触发操作的入口。

list-watch 操作需要做这么几件事:

由组件向 apiserver 而不是 etcd 发起 watch 请求,在组件启动时就进行订阅,告诉 apiserver 需要知道什么数据发生变化。Watch 是一个典型的发布 - 订阅模式。

组件向 apiserver 发起的 watch 请求是可以带条件的,例如,scheduler 想要 watch 的是所有未被调度的 Pod,也就是满足 Pod.destNode= 的 Pod 来进行调度操作;而 kubelet 只关心自己节点上的 Pod 列表。apiserver 向 etcd 发起的 watch 是没有条件的,只能知道某个数据发生了变化或创建、删除,但不能过滤具体的值。也就是说对象数据的条件过滤必须在 apiserver 端而不是 etcd 端完成。

list 是 watch 失败,数据太过陈旧后的弥补手段,这方面详见 基于 list-watch 的 Kubernetes 异步事件处理框架详解 - 客户端部分。list 本身是一个简单的列表操作,和其它 apiserver 的增删改操作一样,不再多描述细节。

1. watch 的 API 处理

既然 watch 本身是一个 apiserver 提供的 http restful 的 API,那么就按照 API 的方式去阅读它的代码,按照 apiserver 的基础功能实现一文所描述,我们来看它的代码,

关键的处理 API 注册代码 pkg/apiserver/api_installer.go

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage,...
 lister, isLister := storage.(rest.Lister)
 watcher, isWatcher := storage.(rest.Watcher) ...(1)
... 
 case  LIST : // List all resources of a kind. ...(2)
 doc :=  list objects of kind   + kind
 if hasSubresource {
 doc =  list   + subresource +   of objects of kind   + kind
 handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) ...(3)

一个 rest.Storage 对象会被转换为 watcher 和 lister 对象

提供 list 和 watch 服务的入口是同一个,在 API 接口中是通过 GET /pods?watch=true 这种方式来区分是 list 还是 watch

API 处理函数是由 lister 和 watcher 经过 ListResource()合体后完成的。

那么就看看 ListResource()的具体实现吧,/pkg/apiserver/resthandler.go

func ListResource(r rest.Lister, rw rest.Watcher,... {if (opts.Watch || forceWatch)   rw != nil {watcher, err := rw.Watch(ctx,  opts) ...(1)
 ....
 serveWatch(watcher, scope, req, res, timeout)
 return
 result, err := r.List(ctx,  opts) ...(2) 
 write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)

每次有一个 watch 的 url 请求过来,都会调用 rw.Watch()创建一个 watcher,好吧这里的名字和上面那一层的名字重复了,但我们可以区分开,然后使用 serveWatch()来处理这个请求。watcher 的生命周期是每个 http 请求的,这一点非常重要。

list 在这里是另外一个分支,和 watch 分别处理,可以忽略。

响应 http 请求的过程 serveWatch()的代码在 /pkg/apiserver/watch.go 里面

func serveWatch(watcher watch.Interface... {server.ServeHTTP(res.ResponseWriter, req.Request)
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 for {
 select {case event, ok :=  -s.watching.ResultChan():
 obj := event.Object
 if err := s.embeddedEncoder.EncodeToStream(obj, buf); 
}

这段的操作基本毫无技术含量,就是从 watcher 的结果 channel 中读取一个 event 对象,然后持续不断的编码写入到 http response 的流当中。

这是整个过程的图形化描述:

所以,我们的问题就回到了

watcher 这个对象,严格来说是 watch.Interface 的对象,位置在 pkg/watch/watch.go 中,是怎么被创建出来的?

这个 watcher 对象是怎么从 etcd 中获得变化的数据的?又是怎么过滤条件的?

2. 在代码迷宫中追寻 watcher

回到上面的代码追踪过程来看,watcher(watch.Interface)对象是被 Rest.Storage 对象创建出来的。从上一篇 apiserver 的基础功能实现 可以知道,所有的 Rest.Storage 分两层,一层是每个对象自己的逻辑,另一层则是通过通用的操作来搞定,像 watch 这样的操作应该是通用的,所以我们看这个源代码

/pkg/registry/generic/registry/store.go

func (e *Store) Watch(ctx api.Context, options *api.ListOptions) (watch.Interface, error) {return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
func (e *Store) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {return e.Storage.Watch(ctx, key, resourceVersion, filterFunc) ...(1)
 return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc)
}

果然,我们在 (1) 这里找到了生成 Watch 的函数,但这个工作是由 e.Storage 来完成的,所以我们需要找一个具体的 Storage 的生成过程,以 Pod 为例子

/pkg/registry/pod/etcd/etcd.go

func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
 prefix :=  /pods 
 storageInterface := opts.Decorator(opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Pods),  api.Pod{}, prefix, pod.Strategy, newListFunc) ...(1)
 store :=  registry.Store{
 ...
 Storage: storageInterface, ...(2)
 return PodStorage{Pod:  REST{store, proxyTransport}, ...(3)

这 (1) 就是 Storage 的生成现场,传入的参数包括了一个缓存 Pod 的数量。(2)(3)是和上面代码的连接点。那么现在问题就转化为追寻 Decorator 这个东西具体是怎么生成的,需要重复刚才的过程,往上搜索 opts 是怎么搞进来的。

/pkg/master/master.go – GetRESTOptionsOrDie()

/pkg/genericapiserver/genericapiserver.go – StorageDecorator()

/pkg/registry/generic/registry/storage_factory.go – StorageWithCacher()

/pkg/storage/cacher.go

OK,这样我们就来到正题,一个具体的 watch 缓存的实现了!

把上面这个过程用一幅图表示:

3. watch 缓存的具体实现

看代码,首要看的是数据结构,以及考虑这个数据结构和需要解决的问题之间的关系。

3.1 Cacher(pkg/storage/cacher.go)

对于 cacher 这结构来说,我们从外看需求,可以知道这是一个 Storage,用于提供某个类型的数据,例如 Pod 的增删改查请求,同时它又用于 watch,用于在 client 端需要对某个 key 的变化感兴趣时,创建一个 watcher 来源源不断的提供新的数据给客户端。

那么 cacher 是怎么满足这些需求的呢?答案就在它的结构里面:

type Cacher struct {
 // Underlying storage.Interface.
 storage Interface
 //  sliding window  of recent changes of objects and the current state.
 watchCache *watchCache
 reflector *cache.Reflector
 // Registered watchers.
 watcherIdx int
 watchers map[int]*cacheWatcher
}

略去里面的锁(在看代码的时候一开始要忽略锁的存在,锁是后期为了避免破坏数据再加上去的,不影响数据流),略去里面的一些非关键的成员,现在我们剩下这 3 段重要的成员,其中

storage 是连接 etcd 的,也就是背后的裸存储

watchCache 并不仅仅是和注释里面说的那样,是个滑动窗口,里面存储了所有数据 + 滑动窗口

watchers 这是为每个请求创建的 struct,每个 watch 的 client 上来后都会被创建一个,所以这里有个 map

当然,这 3 个成员的作用是我看了所有代码后,总结出来的,一开始读代码时不妨先在脑子里面有个定位,然后在看下面的方法时不断修正这个定位。那么,接下来就看看具体的方法是怎么让数据在这些结构里面流动的吧!

初始化方法

func NewCacherFromConfig(config CacherConfig) *Cacher { 
 cacher.startCaching(stopCh)
func (c *Cacher) startCaching(stopChannel  -chan struct{}) {if err := c.reflector.ListAndWatch(stopChannel); err != nil {glog.Errorf( unexpected ListAndWatch error: %v , err)
}

其他的部分都是陈词滥调,只有 startCaching()这段有点意思,这里启动一个 go 协程,最后启动了 c.reflector.ListAndWatch()这个方法,如果对 k8s 的基本有了解的话,这个其实就是一个把远端数据源源不断的同步到本地的方法,那么数据落在什么地方呢?往上看可以看到

reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),

也就是说从创建 cacher 的实例开始,就会从 etcd 中把所有 Pod 的数据同步到 watchCache 里面来。这也就印证了 watchCache 是数据从 etcd 过来的第一站。

增删改方法

func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {return c.storage.Create(ctx, key, obj, out, ttl)
}

大部分方法都很无聊,就是短路到底层的 storage 直接执行。

Watch 方法

// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
 watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
 c.watchers[c.watcherIdx] = watcher
 c.watcherIdx++
 return watcher, nil
}

这里的逻辑就比较清晰,首先从 watchCache 中拿到从某个 resourceVersion 以来的所有数据——initEvents,然后用这个数据创建了一个 watcher 返回出去为某个客户端提供服务。

List 方法

// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error {filterFunc := filterFunction(key, c.keyFunc, filter)
 objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
 if err != nil {return fmt.Errorf( failed to wait for fresh list: %v , err)
 for _, obj := range objs {if filterFunc(object) {listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
}

从这段代码中我们可以看出 2 件事,一是 list 的数据都是从 watchCache 中获取的,二是获取后通过 filterFunc 过滤了一遍然后返回出去。

3.2 WatchCache(pkg/storage/watch_cache.go)

这个结构应该是缓存的核心结构,从上一层的代码分析中我们已经知道了对这个结构的需求,包括存储所有这个类型的数据,包括当有新的数据过来时把数据扔到 cacheWatcher 里面去,总之,提供 List 和 Watch 两大输出。

type watchCache struct {
 // cache is used a cyclic buffer - its first element (with the smallest
 // resourceVersion) is defined by startIndex, its last element is defined
 // by endIndex (if cache is full it will be startIndex + capacity).
 // Both startIndex and endIndex can be greater than buffer capacity -
 // you should always apply modulo capacity to get an index in cache array.
 cache []watchCacheElement
 startIndex int
 endIndex int
 // store will effectively support LIST operation from the  end of cache
 // history  i.e. from the moment just after the newest cached watched event.
 // It is necessary to effectively allow clients to start watching at now.
 store cache.Store
}

这里的关键数据结构依然是 2 个

cache 环形队列,存储有限个数的最新数据

store 底层实际上是个线程安全的 hashMap,存储全量数据

那么继续看看方法是怎么运转的吧~

增删改方法

func (w *watchCache) Update(obj interface{}) error {event := watch.Event{Type: watch.Modified, Object: object}
 f := func(obj runtime.Object) error { return w.store.Update(obj) }
 return w.processEvent(event, resourceVersion, f)

func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {previous, exists, err := w.store.Get(event.Object) watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion} w.onEvent(watchCacheEvent) w.updateCache(resourceVersion, watchCacheEvent) // Assumes that lock is already held for write. func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) {w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event} w.endIndex++ }

所有的增删改方法做的事情都差不多,就是在 store 里面存具体的数据,然后调用 processEvent()去增加环形队列里面的数据,如果详细看一下 onEvent 的操作,就会发现这个操作的本质是落在 cacher.go 里面:

func (c *Cacher) processEvent(event watchCacheEvent) {
 for _, watcher := range c.watchers {watcher.add(event)
}

往所有的 watcher 里面挨个添加数据。总体来说,我们可以从上面的代码中得出一个结论:cache 里面存储的是 Event,也就是有 prevObject 的,对于所有操作都会在 cache 里面保存,但对于 store 来说,只存储当下的数据,删了就删了,改了就改了。

WaitUntilFreshAndList()

这里本来应该讨论 List()方法的,但在 cacher 里面的 List()实际上使用的是这个,所以我们看这个方法。

func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {startTime := w.clock.Now()
 go func() {w.cond.Broadcast()
 for w.resourceVersion   resourceVersion {w.cond.Wait()
 return w.store.List(), w.resourceVersion, nil}

这个方法比较绕,前面使用了一堆 cond 通知来和其他协程通信,最后还是调用了 store.List()把数据返回出去。后面来具体分析这里的协调机制。

GetAllEventsSinceThreadUnsafe()

这个方法在 cacher 的创建 cacheWatcher 里面使用,把当前 store 里面的所有数据都搞出来,然后把 store 里面的数据都转换为 AddEvent,配上 cache 里面的 Event,全部返回出去。

3.3 CacheWatcher(pkg/storage/cacher.go)

这个结构是每个 watch 的 client 都会拥有一个的,从上面的分析中我们也能得出这个结构的需求,就是从 watchCache 里面搞一些数据,然后写到客户端那边。

// cacherWatch implements watch.Interface
type cacheWatcher struct {
 sync.Mutex
 input chan watchCacheEvent
 result chan watch.Event
 filter FilterFunc
 stopped bool
 forget func(bool)
}

这段代码比较简单,就不去分析方法了,简单说就是数据在增加的时候放到 input 这个 channel 里面去,通过 filter 然后输出到 result 这个 channel 里面去。

到此,相信大家对“apiserver 的 list-watch 怎么使用”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计10083字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)