如何解析client

62次阅读
没有评论

共计 7833 个字符,预计需要花费 20 分钟才能阅读完成。

今天就跟大家聊聊有关如何解析 client-go 中 workqueue,可能很多人都不太了解,为了让大家更加了解,丸趣 TV 小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

下面主要讲述下 client-go 中 workqueue, 看一下 client-go 的一个整体数据走向. 如下图:

而 workqueue 主要是在 listener 这里引用,listener 使用 chan 获取到数据之后将数据放入到工作队列进行处理。主要是由于 chan 过于简单,已经无法满足 K8S 的场景,所以衍生出了 workqueue,

特性

有序

去重

并发

延迟处理

限速

当前有三种 workqueue

基本队列

延迟队列

限速队列

其中延迟队列是基于基本队列实现的,而限流队列基于延迟队列实现

基本队列

看一下基本队列的接口

// client-go 源码路径 util/workqueue/queue.go
type Interface interface {
 // 新增元素   可以是任意对象
 Add(item interface{})
 // 获取当前队列的长度
 Len() int
 //  阻塞获取头部元素 (先入先出)  返回元素以及队列是否关闭
 Get() (item interface{}, shutdown bool)
 //  显示标记完成元素的处理
 Done(item interface{})
 // 关闭队列
 ShutDown()
 // 队列是否处于关闭状态
 ShuttingDown() bool}

看一下基本队列的数据结构, 只看三个重点处理的, 其他的没有展示出来

type Type struct {
 // 含有所有元素的元素的队列   保证有序
 queue []t
 // 所有需要处理的元素  set 是基于 map 以 value 为空 struct 实现的结构,保证去重
 dirty set
 // 当前正在处理中的元素
 processing set
 ...
type empty struct{}
type t interface{}
type set map[t]empty

基本队列的 hello world 也很简单

 wq := workqueue.New()
 wq.Add(hello)
 v, _ := wq.Get()

基本队列 Add

func (q *Type) Add(item interface{}) { q.cond.L.Lock()
 defer q.cond.L.Unlock()
 // 如果当前处于关闭状态, 则不再新增元素
 if q.shuttingDown {
 return
 }
 // 如果元素已经在等待处理中, 则不再新增
 if q.dirty.has(item) {
 return
 }
 // 添加到 metrics
 q.metrics.add(item)
 // 加入等待处理中
 q.dirty.insert(item)
 // 如果目前正在处理该元素   就不将元素添加到队列
 if q.processing.has(item) {
 return
 }
 q.queue = append(q.queue, item)
 q.cond.Signal()}

基本队列 Get

func (q *Type) Get() (item interface{}, shutdown bool) { q.cond.L.Lock()
 defer q.cond.L.Unlock()
 // 如果当前没有元素并且不处于关闭状态, 则阻塞
 for len(q.queue) == 0   !q.shuttingDown { q.cond.Wait()
 }
 ...
 item, q.queue = q.queue[0], q.queue[1:]
 q.metrics.get(item)
 // 把元素添加到正在处理队列中
 q.processing.insert(item)
 // 把队列从等待处理队列中删除
 q.dirty.delete(item)
 return item, false
}

基本队列实例化

func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
 t :=  Type{
 clock: c,
 dirty: set{},
 processing: set{},
 cond: sync.NewCond(sync.Mutex{}),
 metrics: metrics,
 unfinishedWorkUpdatePeriod: updatePeriod,
 }
 // 启动一个协程   定时更新 metrics
 go t.updateUnfinishedWorkLoop()
 return t
func (q *Type) updateUnfinishedWorkLoop() { t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
 defer t.Stop()
 for range t.C() { if !func() bool { q.cond.L.Lock()
 defer q.cond.L.Unlock()
 if !q.shuttingDown { q.metrics.updateUnfinishedWork()
 return true
 }
 return false
 }() {
 return
 }
 }
}

延迟队列

延迟队列的实现思路主要是使用优先队列存放需要延迟添加的元素, 每次判断最小延迟的元素书否已经达到了加入队列的要求 (延迟的时间到了), 如果是则判断下一个元素, 直到没有元素或者元素还需要延迟为止。

看一下延迟队列的数据结构

type delayingType struct {
 Interface
 ...
 // 放置延迟添加的元素
 waitingForAddCh chan *waitFor
 ...
}

主要是使用 chan 来保存延迟添加的元素, 而具体实现是通过一个实现了一个 AddAfter 方法,看一下具体的内容

// 延迟队列的接口
type DelayingInterface interface {
 Interface
 // AddAfter adds an item to the workqueue after the indicated duration has passed
 AddAfter(item interface{}, duration time.Duration)
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
 ...
 // 如果延迟实现小于等于 0   直接添加到队列
 if duration  = 0 { q.Add(item)
 return
 }
 select {
 case  -q.stopCh:
 // 添加到 chan, 下面会讲一下这个 chan 的处理
 case q.waitingForAddCh  -  waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
 }
}

延迟元素的处理

func (q *delayingType) waitingLoop() { defer utilruntime.HandleCrash()
 never := make(-chan time.Time)
 var nextReadyAtTimer clock.Timer
 waitingForQueue :=  waitForPriorityQueue{}
 // 这里是初始化一个优先队列   具体实现有兴趣的同学可以研究下
 heap.Init(waitingForQueue)
 waitingEntryByData := map[t]*waitFor{}
 for { if q.Interface.ShuttingDown() {
 return
 }
 now := q.clock.Now()
 // Add ready entries
 for waitingForQueue.Len()   0 { entry := waitingForQueue.Peek().(*waitFor)
 // 看一下第一个元素是否已经到达延迟的时间了
 if entry.readyAt.After(now) {
 break
 }
 // 时间到了, 将元素添加到工作的队列, 并且从延迟的元素中移除
 entry = heap.Pop(waitingForQueue).(*waitFor)
 q.Add(entry.data)
 delete(waitingEntryByData, entry.data)
 }
 // Set up a wait for the first item s readyAt (if one exists)
 nextReadyAt := never
 if waitingForQueue.Len()   0 {
 if nextReadyAtTimer != nil { nextReadyAtTimer.Stop()
 }
 // 如果还有需要延迟的元素, 计算第一个元素的延迟时间 (最小延迟的元素)
 entry := waitingForQueue.Peek().(*waitFor)
 nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
 nextReadyAt = nextReadyAtTimer.C()
 }
 select {
 case  -q.stopCh:
 return
 case  -q.heartbeat.C():
 // 定时检查下是否有元素达到延迟的时间
 case  -nextReadyAt:
 // 这里是上面计算出来的时间, 时间到了, 处理到达延迟时间的元素
 case waitEntry :=  -q.waitingForAddCh:
 // 检查是否需要延迟, 如果需要延迟就加入到延迟等待
 if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry)
 } else {
 // 如果不需要延迟就直接添加到队列
 q.Add(waitEntry.data)
 }
 drained := false
 for !drained {
 select { case waitEntry :=  -q.waitingForAddCh:

上面 waitingLoop 是在实例化延迟队列的时候调用的,看一下实例化时候的逻辑

func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
 // 实例化一个数据结构
 ret :=  delayingType{ Interface: NewNamed(name),
 clock: clock,
 heartbeat: clock.NewTicker(maxWait),
 stopCh: make(chan struct{}),
 waitingForAddCh: make(chan *waitFor, 1000),
 metrics: newRetryMetrics(name),
 }
 // 放到一个协程中处理延迟元素
 go ret.waitingLoop()
 return ret
}

限速队列

当前限速队列支持 4 中限速模式

令牌桶算法限速

排队指数限速

计数器模式

混合模式 (多种限速算法同时使用)

限速队列的底层实际上还是通过延迟队列来进行限速, 通过计算出元素的限速时间作为延迟时间

来看一下限速接口

type RateLimiter interface {
 //
 When(item interface{}) time.Duration
 // Forget indicates that an item is finished being retried. Doesn t matter whether its for perm failing
 // or for success, we ll stop tracking it
 Forget(item interface{})
 // NumRequeues returns back how many failures the item has had
 NumRequeues(item interface{}) int
}

看一下限速队列的数据结构

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
 DelayingInterface
 // 实际上底层还是调用的延迟队列, 通过计算出元素的延迟时间   进行限速
 AddRateLimited(item interface{})
 // Forget indicates that an item is finished being retried. Doesn t matter whether it s for perm failing
 // or for success, we ll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
 // still have to call `Done` on the queue.
 Forget(item interface{})
 // NumRequeues returns back how many times the item was requeued
 NumRequeues(item interface{}) int
func (q *rateLimitingType) AddRateLimited(item interface{}) {
 // 通过 when 方法计算延迟加入队列的时间
 q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

令牌桶算法

client-go 中的令牌桶限速是通过 golang.org/x/time/rat 包来实现的

可以通过 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 来使用令牌桶限速算法,其中第一个参数 qps 表示每秒补充多少 token,burst 表示总 token 上限为多少。

排队指数算法

排队指数可以通过 workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 来使用。

这个算法有两个参数:

baseDelay 基础限速时间

maxDelay 最大限速时间

举个例子来理解一下这个算法,例如快速插入 5 个相同元素,baseDelay 设置为 1 秒,maxDelay 设置为 10 秒,都在同一个限速期内。第一个元素会在 1 秒后加入到队列,第二个元素会在 2 秒后加入到队列,第三个元素会在 4 秒后加入到队列,第四个元素会在 8 秒后加入到队列,第五个元素会在 10 秒后加入到队列 (指数计算的结果为 16,但是最大值设置了 10 秒)。

来看一下源码的计算

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock()
 defer r.failuresLock.Unlock()
 // 第一次为 0
 exp := r.failures[item]
 // 累加 1
 r.failures[item] = r.failures[item] + 1
 // 通过当前计数和 baseDelay 计算指数结果  baseDelay*(2 的 exp 次方)
 backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
 if backoff   math.MaxInt64 {
 return r.maxDelay
 }
 calculated := time.Duration(backoff)
 if calculated   r.maxDelay {
 return r.maxDelay
 }
 return calculated
}

计数器模式

计数器模式可以通过 workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) 来使用,有三个参数

fastDelay 快限速时间

slowDelay 慢限速时间

maxFastAttempts 快限速元素个数

原理是这样的,假设 fastDelay 设置为 1 秒,slowDelay 设置为 10 秒,maxFastAttempts 设置为 3,同样在一个限速周期内快速插入 5 个相同的元素。前三个元素都是以 1 秒的限速时间加入到队列,添加第四个元素时开始使用 slowDelay 限速时间,也就是 10 秒后加入到队列,后面的元素都将以 10 秒的限速时间加入到队列,直到限速周期结束。

来看一下源码

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock()
 defer r.failuresLock.Unlock()
 // 添加一次就计数一次
 r.failures[item] = r.failures[item] + 1
 // 计数小于 maxFastAttempts 都以 fastDelay 为限速时间,否则以 slowDelay 为限速时间
 if r.failures[item]  = r.maxFastAttempts {
 return r.fastDelay
 }
 return r.slowDelay
}

混合模式

最后一种是混合模式,可以组合使用不同的限速算法实例化限速队列

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter { return  MaxOfRateLimiter{limiters: limiters}
}

在 k8s-client-go 的源码中可以看到,大量的接口组合运用,将各种功能拆分成各个细小的库,是一种非常值得学习的代码风格以及思路。

看完上述内容,你们对如何解析 client-go 中 workqueue 有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注丸趣 TV 行业资讯频道,感谢大家的支持。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计7833字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)