KAFKA中rebalance是什么

99次阅读
没有评论

共计 25687 个字符,预计需要花费 65 分钟才能阅读完成。

本篇内容介绍了“KAFKA 中 rebalance 是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、写在前面

让我们从头到尾梳理一下 rebalance。

二、什么是 rebalance?

中文直译,就是重平衡。
是什么去重平衡呢?消费组内的消费者成员去重平衡。(消费组的概念如果不清楚各位先自行百度,后续我写到消费模块的时候才会提到这些概念)
为什么需要重平衡呢?因为消费组内成员的故障转移和动态分区分配。

翻译一下:
消费组内成员的故障转移:当一个消费组内有三个消费者 A,B,C,分别消费分区:a,b,c

A -  a
B -  b
C -  c

此时如果 A 消费者出了点问题,那么就意味着 a 分区没有消费者进行消费了,那这肯定不行,那么就通过 rebalance 去将 a 分区分配给其他还存活着的消费者客户端,rebalance 后可能得到的消费策略:

A -  a (GG)B -  b,a
C -  c

这就是消费组内成员的故障转移,就是某个消费者客户端出问题之后把它原本消费的分区通过 REBALNACE 分配给其他存活的消费者客户端。

动态分区分配:当某个 topic 的分区数变化,对于消费组而言可消费的分区数变化了,因此就需要 rebalance 去重新进行动态分区分配,举个栗子,原本某 topic 只有 3 个分区,我现在扩成了 10 个分区,那么不就意味着多了 7 个分区没有消费者消费吗?这显然是不行的,因此就需要 rebalance 过程去进行分区分配,让现有的消费者去把这 10 个分区全部消费到。

三、rebalance 是怎么触发的?

这个其实在上面一小节已经提到的差不多了,在这个小节再做一点补充和总结。
触发条件:

消费组内成员变化:下线 / 上线 / 故障被踢出。

消费的分区数变化:topic 被删了,topic 分区数增加了。

coordinator 节点出问题了:因为消费组的元数据信息都是在 coordinator 节点的,因此 coordinator 节点出问题也会触发 rebalance 去找一个新的 coordinator 节点。怎么找呢?显然就是走一遍 FIND_COORDINATOR 请求嘛,然后找到负载最低的那个节点问一下,我的新的 coordinator 在哪儿呀?然后得到答案之后让消费者客户端去连新的 coordinator 节点。

四、rebalance 的宏观过程

整个 rebalance 的过程,是一个状态机流转的过程,整体过程示意图如下:图源:https://www.cnblogs.com/huxi2b/p/6815797.html
其实上面这个状态机流转过程在明白原理的情况下,已经非常清晰了,但是如果没看过源码的,依旧不知道为什么是这么流转的,什么情况下状态是 Empty 呢,什么状态下是 Stable 呢?什么时候 Empty 状态会转换为 PreparingRebalance 状态呢?
下面我就根据请求顺序来看下整个状态的流转过程:
让我们来回答上个小节后面提出的几个比较细节的问题:

这些请求都带有哪些关键数据?
在 FIND_COORDINATOR 请求的时候,会带上自己的 group.id 值,这个值是用来计算它的 coordinator 到底在哪儿的,对应的计算方法就是:coordinatorId=groupId.hash % 50 这个算出来是个数字,代表着具体的分区,哪个 topic 的分区呢?显然是__consumer_offsets 了。
在 JOIN_GROUP 请求的时候,是没带什么关键参数的,但是在响应的时候会挑选一个客户端作为 leader,然后在响应中告诉它被选为了 leader 并且把消费组元数据信息发给它,然后让该客户端去进行分区分配。
在 SYNC_GROUP 请求的时候,leader 就会带上它根据具体的策略已经分配好的分区分配方案,服务端收到后就更新到元数据里面去,然后其余的 consumer 客户端只要一发送 SYNC 请求过来就告诉它要消费哪些分区,然后让它自己去消费就 ok 了。

到底是哪个阶段导致 rebalance 过程会劣化到几分钟?
我图中特意将 JOIN 阶段标位红色,就是让这个阶段显得显眼一些,没错就是这个阶段会导致 rebalance 整个过程耗时劣化到几分钟。
具体的原因就是 JOIN 阶段会等待原先组内存活的成员发送 JOIN_GROUP 请求过来,如果原先组内的成员因为业务处理一直没有发送 JOIN_GROUP 请求过来,服务端就会一直等待,直到超时。这个超时时间就是 max.poll.interval.ms 的值,默认是 5 分钟,因此这种情况下 rebalance 的耗时就会劣化到 5 分钟,导致所有消费者都无法进行正常消费,影响非常大。

为什么要分为这么多阶段?
这个主要是设计上的考虑,整个过程设计的还是非常优雅的,第一次连上的情况下需要三次请求,正常运行的 consumer 去进行 rebalance 只需要两次请求,因为它原先就知道自己的 coordinator 在哪儿,因此就不需要 FIND_COORDINATOR 请求了,除非是它的 coordinator 宕机了。

回答完这些问题,是不是对整个 rebalance 过程理解加深一些了呢?其实还有很多细节没有涉及到,例如 consumer 客户端什么时候会进入 rebalance 状态?服务端是如何等待原先消费组内的成员发送 JOIN_GROUP 请求的呢?这些问题就只能一步步看源码了。

FIND_COORDINATOR 请求的源码我就不打写了,很简单大家可以自己翻一下,就是带了个 group.id 上去,上面都提到了。

六、JOIN 阶段源码分析

从这段函数我们知道,如果加入一个新的消费组,服务端收到第一个 JOIN 请求的时候会创建 group,这个 group 的初始状态为 Empty

 //  如果 group 都还不存在,就有了 memberId, 则认为是非法请求,直接拒绝。 groupManager.getGroup(groupId) match {
 case None = 
 //  这里 group 都还不存在的情况下,memberId 自然是空的
 if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
 } else {
 //  初始状态是 EMPTY
 val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
 //  执行具体的加组操作
 doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
 }
 case Some(group) = 
 doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
 }

让我们进入 doJoinGroup 函数,看下里面的核心逻辑:

 case Empty | Stable = 
 //  初始状态是 EMPTY, 添加 member 并且执行 rebalance
 if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
 // if the member id is unknown, register the member to the group
 addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
 } else {
 // ...
 } else {
  //...
 }
 private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
 sessionTimeoutMs: Int,
 clientId: String,
 clientHost: String,
 protocolType: String,
 protocols: List[(String, Array[Byte])],
 group: GroupMetadata,
 callback: JoinCallback) = {
 //  根据 clientID 初始化 memberID
 val memberId = clientId +  -
 def add(member: MemberMetadata) { if (members.isEmpty)
 this.protocolType = Some(member.protocolType)
 assert(groupId == member.groupId)
 assert(this.protocolType.orNull == member.protocolType)
 assert(supportsProtocols(member.protocols))
 // coordinator 选举 leader 很简单,就第一个发送 join_group 请求的那个 member
 if (leaderId.isEmpty)
 leaderId = Some(member.memberId)
 members.put(member.memberId, member)
 }

上面的代码翻译一下很简单,就是新来了一个 member,封装一下,添加到这个 group 中,需要说一下的就是当组状态是 Empty 的情况下,谁先连上谁就是 leader。紧接着就准备 rebalance:

 private def maybePrepareRebalance(group: GroupMetadata) {
 group.inLock { if (group.canRebalance)
 prepareRebalance(group)
 }
 }
 //  这里是传入 PreparingRebalance 状态,然后获取到一个 SET
 //  翻译一下:就是只有这个 SET(Stable, CompletingRebalance, Empty)里面的状态,才能开启 rebalance
 def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)
 private val validPreviousStates: Map[GroupState, Set[GroupState]] =
 Map(Dead -  Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),
 CompletingRebalance -  Set(PreparingRebalance),
 Stable -  Set(CompletingRebalance),
 PreparingRebalance -  Set(Stable, CompletingRebalance, Empty),
 Empty -  Set(PreparingRebalance))
 private def prepareRebalance(group: GroupMetadata) {
 // if any members are awaiting sync, cancel their request and have them rejoin
 if (group.is(CompletingRebalance))
 resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
 val delayedRebalance = if (group.is(Empty))
 new InitialDelayedJoin(this,
 joinPurgatory,
 group,
 groupConfig.groupInitialRebalanceDelayMs,//  默认 3000ms, 即 3s
 groupConfig.groupInitialRebalanceDelayMs,
 max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
 else
 new DelayedJoin(this, group, group.rebalanceTimeoutMs)//  这里这个超时时间是客户端的 poll 间隔,默认 5 分钟
 //  状态机转换:EMPTY -  PreparingRebalance
 group.transitionTo(PreparingRebalance)
 // rebalance 开始标志日志
 info(s Preparing to rebalance group ${group.groupId} with old generation ${group.generationId}   +
 s (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) )
 //  加入时间轮
 val groupKey = GroupKey(group.groupId)
 joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
 }

上面这段代码有两个关键点,一个是判断当前能否进入 rebalance 过程,可以看到只有(Stable, CompletingRebalance, Empty)里面的状态,才能开启 rebalance,而最开始来到第一个 member 的时候,组的状态是 Empty 显然是能进来的,但是近来之后就给转换为了 PreparingRebalance 状态,那么后续的 member 发送 JOIN 请求过来之后就进不来了,就只能设置个回调后一直等。
那么要等到什么时候呢?第二段代码写的很清楚就是等待延时任务超时,这个延时任务创建是根据当前状态来判断的,如果是 Empty 就创建一个 InitialDelayedJoin 延时任务,超时时间是 3s;如果不是 Empty 就创建一个 DelayedJoin,超时时间默认是 5min。看,源码出真知,这就是 JOIN 阶段等待 member 的代码实现。
这里需要补充一下,为什么 Empty 的状态下要等待 3s 呢?这其实是一个优化,主要就是优化多消费者同时连入的情况。举个栗子,10 个消费者都能在 3s 内启动然后练上,如果你等着 3s 时间那么一次 rebalance 过程就搞定了,如果你不等,那么就意味着来一个就又要开启一次 rebalance,一共要进行 10 次 rebalance,这个耗时就比较长了。具体的细节可以查看:https://www.cnblogs.com/huxi2b/p/6815797.html
另外就是,为什么状态不是 Empty 的时候就延时 5 分钟呢?这个其实上面就回答了,要等待原来消费组内在线的消费者发送 JOIN 请求,这个也是 rebalance 过程耗时劣化的主要原因。

接下来我们看看这两个延时任务,在超时的时候分别都会做些啥,首先是 InitialDelayedJoin:

/**
 * Delayed rebalance operation that is added to the purgatory when a group is transitioning from
 * Empty to PreparingRebalance
 *
 * When onComplete is triggered we check if any new members have been added and if there is still time remaining
 * before the rebalance timeout. If both are true we then schedule a further delay. Otherwise we complete the
 * rebalance.
 */
private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
 purgatory: DelayedOperationPurgatory[DelayedJoin],
 group: GroupMetadata,
 configuredRebalanceDelay: Int,
 delayMs: Int,
 remainingMs: Int) extends DelayedJoin(coordinator, group, delayMs) {
 //  这里写死是 false,是为了在 tryComplete 的时候不被完成
 override def tryComplete(): Boolean = false
 override def onComplete(): Unit = {
 //  延时任务处理
 group.inLock {
 // newMemberAdded 是后面有新的 member 加进来就会是 true
 // remainingMs 第一次创建该延时任务的时候就是 3s。 //  所以这个条件在第一次的时候都是成立的
 if (group.newMemberAdded   remainingMs != 0) {
 group.newMemberAdded = false
 val delay = min(configuredRebalanceDelay, remainingMs)
 //  最新计算的 remaining 恒等于 0,其实本质上就是 3 -3=0, //  所以哪怕这里是新创建了一个 InitialDelayedJoin,这个任务的超时时间就是下一刻
 //  这么写的目的其实就是相当于去完成这个延时任务
 val remaining = max(remainingMs - delayMs, 0)
 purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator,
 purgatory,
 group,
 configuredRebalanceDelay,
 delay,
 remaining
 ), Seq(GroupKey(group.groupId)))
 } else
 //  如果没有新的 member 加入,直接调用父类的函数
 //  完成 JOIN 阶段
 super.onComplete()
 }
 }
}

大意我都写在注释里面了,其实就是等待 3s,然后完了之后调用父类的函数完成整个 JOIN 阶段,不过不联系上下文去看,还是挺费劲的,对了看这个需要对时间轮源码有了解,正好我前面有写,大家如果有什么不清楚的可以去看下。
接着看下 DelayedJoin 超时后会干嘛:

/**
 * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
 *
 * Whenever a join-group request is received, check if all known group members have requested
 * to re-join the group; if yes, complete this operation to proceed rebalance.
 *
 * When the operation has expired, any known members that have not requested to re-join
 * the group are marked as failed, and complete this operation to proceed rebalance with
 * the rest of the group.
 */
private[group] class DelayedJoin(coordinator: GroupCoordinator,
 group: GroupMetadata,
 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) { override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
 override def onExpiration() = coordinator.onExpireJoin()
 override def onComplete() = coordinator.onCompleteJoin(group)
 //  超时之后啥也没干,哈哈,因为确实不用做啥,置空就好了
 //  核心是 onComplete 函数和 tryComplete 函数
 def onExpireJoin() {
 // TODO: add metrics for restabilize timeouts
 }
 def tryCompleteJoin(group: GroupMetadata, forceComplete: () =  Boolean) = {
 group.inLock { if (group.notYetRejoinedMembers.isEmpty)
 forceComplete()
 else false
 }
 }
 def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
 
 def forceComplete(): Boolean = { if (completed.compareAndSet(false, true)) {
 // cancel the timeout timer
 cancel()
 onComplete()
 true
 } else {
 false
 }
 }
 def onCompleteJoin(group: GroupMetadata) {
 group.inLock {
 // remove any members who haven t joined the group yet
 //  如果组内成员依旧没能连上,那么就删除它,接收当前 JOIN 阶段
 group.notYetRejoinedMembers.foreach { failedMember = 
 group.remove(failedMember.memberId)
 // TODO: cut the socket connection to the client
 }
 if (!group.is(Dead)) {
 //  状态机流转  : preparingRebalancing -  CompletingRebalance
 group.initNextGeneration()
 if (group.is(Empty)) { info(s Group ${group.groupId} with generation ${group.generationId} is now empty   +
 s (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) )
 groupManager.storeGroup(group, Map.empty, error =  { if (error != Errors.NONE) {
 // we failed to write the empty group metadata. If the broker fails before another rebalance,
 // the previous generation written to the log will become active again (and most likely timeout).
 // This should be safe since there are no active members in an empty generation, so we just warn.
 warn(s Failed to write empty metadata for group ${group.groupId}: ${error.message} )
 }
 })
 } else {
 // JOIN 阶段标志结束日志
 info(s Stabilized group ${group.groupId} generation ${group.generationId}   +
 s (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) )
 // trigger the awaiting join group response callback for all the members after rebalancing
 for (member  - group.allMemberMetadata) { assert(member.awaitingJoinCallback != null)
 val joinResult = JoinGroupResult(
 //  如果是 leader  就返回 member 列表及其元数据信息
 members = if (group.isLeader(member.memberId)) {
 group.currentMemberMetadata
 } else {
 Map.empty
 },
 memberId = member.memberId,
 generationId = group.generationId,
 subProtocol = group.protocolOrNull,
 leaderId = group.leaderOrNull,
 error = Errors.NONE)
 member.awaitingJoinCallback(joinResult)
 member.awaitingJoinCallback = null
 completeAndScheduleNextHeartbeatExpiration(group, member)
 }
 }
 }
 }
 }

上面这一串代码有几个要点,首先,这个任务超时的时候是啥也不干的,为什么呢?这里要了解时间轮的机制,代码也在上面,当一个任务超时的时候,时间轮强制执行对应任务的 onComplete 函数,然后执行 onExpiration 函数,其实 onExpiration 函数对于这个延时任务来说是没有意义的,并不需要做什么,打日志都懒得打。
第二点就是这个任务 onComplete 什么时候会被调用呢?难道就只能等待 5 分钟超时才能被调用吗?那不是每一次 rebalance 都必须要等待 5 分钟?当然不可能啦,这里就需要先看下 tryComplete 函数的内容,发现这个内容会去检查还没连上的 member,如果发现到期了,就强制完成。那么我们看下这 tryComplete 是在哪儿被调用的?这里需要插入一点之前没贴全的代码,在 doJoinGroup 函数中的而最后一段:

if (group.is(PreparingRebalance))
 joinPurgatory.checkAndComplete(GroupKey(group.groupId))

这段代码非常关键,当当前状态是 PreparingRebalance 的时候,会尝试去完成当前的延时任务,最终调用的代码:

 private[server] def maybeTryComplete(): Boolean = {
 var retry = false
 var done = false
 do { if (lock.tryLock()) {
 try { tryCompletePending.set(false)
 done = tryComplete()
 } finally { lock.unlock()
 }
 // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
 // `tryCompletePending`. In this case we should retry.
 retry = tryCompletePending.get()
 } else {
 // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
 // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
 // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
 // released the lock and returned by the time the flag is set.
 retry = !tryCompletePending.getAndSet(true)
 }
 } while (!isCompleted   retry)
 done
 }

就是上面的 tryComplete 函数,最终会调用到 DelayedJoin 中的 tryComplete 函数,什么意思呢?已经很明显了,每来一个 JOIN 请求的时候,如果处于 PreparingRebalance 阶段,都会去检查一下 group 中原来的成员是否已经到齐了,到齐了就立刻结束 JOIN 阶段往后走。看到这儿,回头看下 InitialDelayedJoin 这个延时任务的 tryComplete 为什么就默认实现了个 false 呢?也明白了,就是初始化延时任务的时候不让你尝试完成,我就等 3s,不需要你们来触发我提前完成。

以上,我们就看完了整个服务端的 JOIN 请求处理过程,其实主要核心就是这两个延时任务,如果不联系上下文,不了解时间轮机制,看起来确实费劲。接下来就看下 SYNC 阶段是如何处理的。

七、SYNC 阶段源码分析

直接看下面的核心源码逻辑:

 private def doSyncGroup(group: GroupMetadata,
 generationId: Int,
 memberId: String,
 groupAssignment: Map[String, Array[Byte]],
 responseCallback: SyncCallback) {
 group.inLock { if (!group.has(memberId)) { responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
 } else if (generationId != group.generationId) { responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
 } else {
 group.currentState match {
 case Empty | Dead = 
 responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
 case PreparingRebalance = 
 responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
 //  只有 group 处于 compeletingRebalance 状态下才会被处理
 //  其余状态都是错误的状态
 case CompletingRebalance = 
 //  给当前 member 设置回调,之后就啥也不干,也不返回
 //  等到 leader 的分区方案就绪后,才会被返回。 group.get(memberId).awaitingSyncCallback = responseCallback
 // if this is the leader, then we can attempt to persist state and transition to stable
 //  只有收到 leader 的 SYNC 才会被处理,并进行状态机流转
 if (group.isLeader(memberId)) { info(s Assignment received from leader for group ${group.groupId} for generation ${group.generationId} )
 // fill any missing members with an empty assignment
 val missing = group.allMembers -- groupAssignment.keySet
 val assignment = groupAssignment ++ missing.map(_ -  Array.empty[Byte]).toMap
 groupManager.storeGroup(group, assignment, (error: Errors) =  {
 group.inLock {
 // another member may have joined the group while we were awaiting this callback,
 // so we must ensure we are still in the CompletingRebalance state and the same generation
 // when it gets invoked. if we have transitioned to another state, then do nothing
 if (group.is(CompletingRebalance)   generationId == group.generationId) { if (error != Errors.NONE) { resetAndPropagateAssignmentError(group, error)
 maybePrepareRebalance(group)
 } else { setAndPropagateAssignment(group, assignment)
 //  状态机流转:CompletingRebalance -  Stable
 group.transitionTo(Stable)
 }
 }
 }
 })
 }
 //  如果已经处于 stable 状态,说明 leader 已经把分区分配方案传上来了
 //  那么直接从 group 的元数据里面返回对应的方案就好了
 case Stable = 
 // if the group is stable, we just return the current assignment
 val memberMetadata = group.get(memberId)
 responseCallback(memberMetadata.assignment, Errors.NONE)
 //  开启心跳检测
 completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
 }
 }
 }
 }

我们可能对上面的代码案处理会有一个疑问,为什么只有 leader 的 SYNC 请求才会被处理呢?要是其他 consumer 比 leader 早上来了难道就卡这儿不管了?不像 JOIN 阶段那样加入个时间轮设置个最大超时时间?这要是 leader 一直不发送 SNYC 请求,那不就所有成员都这儿干等着,无限等待了?
我们一个个来回答,首先,我们看上面的代码,每个请求过来第一件事是先设置回调,然后才去卡住等着,直到 leader 把分区分配方案通过 SYNC 请求带上来。
第二个问题,如果其他 consumer 比 leader 早到了就这么干等着吗?是的,没错,代码就是这么写的。
第三个问题,为什么不设置个最大超时时间啥的?我们可以看下客户端的代码,一旦开启 rebalance 之后,就只会进行相关请求的收发,意味着 leader 在收到 JOIN 阶段的返回后,中间不会有任何业务代码的影响,直接就是分配完分区然后发送 SYNC 请求;这就意味着 leader 的 JOIN 响应和 SYNC 请求之间理论上是不存在阻塞的,因此就可以不用设置超时,就不用加入时间轮了。
第四个问题,leader 一直不发送 SYNC 请求就干等着?是的,代码就是这么写的。不过你想想,哪些情况能让 leader 一直不发送 SYNC 请求?我能想到的就是 GC/leader 宕机了,无论是哪种情况都会因为心跳线程出了问题被服务端检测到,因此在对应的心跳任务超时后重新开启下一轮的 rebalance。哪怕是 GC 很长时间之后恢复了继续发 SYNC 请求过来,也会因为 generation 不匹配而得到错误返回开启下一轮 rebalance。
最后再看下 leader 到了之后会具体做啥:

 private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) { assert(group.is(CompletingRebalance))
 //  给每个 member 的分配方案赋值
 group.allMemberMetadata.foreach(member =  member.assignment = assignment(member.memberId))
 //  在整个 group 中传播这个分配方案
 propagateAssignment(group, Errors.NONE)
 }
 
 private def propagateAssignment(group: GroupMetadata, error: Errors) {
 //  遍历
 //  如果是 follower 比 leader 先到 SYNC 请求
 //  那么就只会设置个 callback,就啥都不干了,也不会返回
 //  直到 leader 带着分配方案来了以后,把状态更改为 stable 之后,才会遍历
 //  看看有哪些 member 已经发送了请求过来,设置了 callback,然后一次性给他们返回回去对应的分区方案
 //  所以这个名称叫做【传播分配方案】 //  真是绝妙
 for (member  - group.allMemberMetadata) { if (member.awaitingSyncCallback != null) {
 //  通过回调告诉 member 对应的分配方案
 member.awaitingSyncCallback(member.assignment, error)
 member.awaitingSyncCallback = null
 // reset the session timeout for members after propagating the member s assignment.
 // This is because if any member s session expired while we were still awaiting either
 // the leader sync group or the storage callback, its expiration will be ignored and no
 // future heartbeat expectations will not be scheduled.
 completeAndScheduleNextHeartbeatExpiration(group, member)
 }
 }
 }

看,最开始设置的回调,在收到 leader 请求时候,起了作用;会被挨个遍历后响应具体的分区分配方案,另外就是 kafka 里面的命名都很准确。

SYNC 阶段简单说起来就是等待 leader 把分区分配方案传上来,如果 member 先到就设置个回调先等着,如果 leader 先到,就直接把分区分配方案存到 group 的元数据中,然后状态修改为 Stable,后续其他 member 来的 SYNC 请求就直接从 group 的元数据取分区分配方案,然后自己消费去了。

八、线上如何排查 rebalance 问题?
看完理论,让我们来看下线上问题怎么排查 rebalance 问题。rebalance 有哪些问题呢?我们来整理一下:

为什么会 rebalance 呢?是什么引起的?能定位到是哪个客户端嘛?

rebalance 耗时了多久?为什么会劣化?常见的就上面两个问题,我们按个来回答。

首先,为什么会 rebalance,这个就三种情况,分区信息变化、客户端变化、coordinator 变化。
一般线上常见的就是客户端变化,那么客户端有哪些可能的变化呢?——新增成员,减少成员。

新增成员怎么看呢?很简单嘛,找到 coordinator,然后去 kafka-request.log 里面搜:cat kafka-request.log |grep -i find | grep -i ${group.id} 不过一般 FIND_COORDINATOR 请求的处理时间都小于 10ms,所以只能打开 debug 日志才能看到。一般这种让客户自己看,对应的时间点是不是有启动 kafka-consumer 就行了,其实也不常见,这种情况。毕竟很少有人频繁开启关闭消费者,就算是有也是不好的业务使用方式。

减少成员呢?又分为两种:心跳超时,poll 间隔超过配置 心跳超时的标识日志:

 def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
 group.inLock { if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
 //  标识日志
 info(s Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group )
 removeMemberAndUpdateGroup(group, member)
 }
 }
 }

很遗憾 poll 间隔超时,在 1.1.0 版本的 info 级别下并没有可查找的日志,检测 poll 时间间隔超时的是对应客户端的心跳线程,在检测到超过配置后就会主动 leaveGroup 从而触发 rebalance,而这个请求在服务端依旧没有 info 级别的请求,因此,要判断是 poll 间隔超时引起的 rebalance,就只能看下有没有上面心跳超时的日志,如果没有可能就是因为这个原因造成的。目前大多数的 rebalance 都是因为这个原因造成的,而且这个原因引发的 rebalance 同时还可能伴随着很长的 rebalance 耗时。
来看下服务端是如何做 poll 间隔超时的呢?

} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
 maybeLeaveGroup();
public boolean sessionTimeoutExpired(long now) { return now - Math.max(lastSessionReset, lastHeartbeatReceive)   sessionTimeout;
 public synchronized void maybeLeaveGroup() { if (!coordinatorUnknown()   state != MemberState.UNJOINED   generation != Generation.NO_GENERATION) {
 // this is a minimal effort attempt to leave the group. we do not
 // attempt any resending if the request fails or times out.
 log.debug(Sending LeaveGroup request to coordinator {} , coordinator);
 LeaveGroupRequest.Builder request =
 new LeaveGroupRequest.Builder(groupId, generation.memberId);
 client.send(coordinator, request)
 .compose(new LeaveGroupResponseHandler());
 client.pollNoWakeup();
 }
 resetGeneration();
 }

总结一下,怎么定位 rebalance 的问题,就是找标志日志,然后排除法,实在不行了就打开 debug 日志。

接着看第二个问题,rebalance 一次的时间耗费了多久?为什么会劣化到几分钟?因为整个 rebalance 过程是线性的过程,就是状态按照请求顺序流转,因此呢找到对应的标志日志就好啦。开启的标志日志:

// rebalance 开始标志日志
info(s Preparing to rebalance group ${group.groupId} with old generation ${group.generationId}   +
 s (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) )

结束的两种标识日志:这两种结束日志都行,因为都差不多代表着 rebalance 过程完成,原因在上面已经讲的很清楚了。

 // JOIN 阶段标志结束日志
 info(s Stabilized group ${group.groupId} generation ${group.generationId}   +
 s (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) )
// SYNC 阶段结束日志
info(s Assignment received from leader for group ${group.groupId} for generation ${group.generationId} )

那么如何统计整个 rebalance 过程的时间呢?显而易见,结束时间 – 开始时间呀。

知道是怎么什么原因开启了 rebalance 之后,该怎么定位业务问题呢?心跳超时:因为心跳线程是守护线程,一般都是因为客户端的机器负载太高导致心跳现场无法获取到 CPU 导致的。
poll 间隔超过配置:显然嘛,就是 poll 出来数据之后,进行业务处理的时候太慢了,建议根据业务优化消费逻辑,改成多线程消费或者异步消费。

九、消费者如何感知到 rebalance 的呢?

这个很简单,我们想一下,与这个 group 有关的元数据全部都在 coordinator 那里,哪些请求会与 coordinator 交互呢?HEARTBEAT/OFFSET_COMMIT 嘛,就这俩,那么其实正常的 member 都是靠这两个请求来感知到自己要去进行 rebalance 的,我们分别来看下。

首先是 HEARTBEAT 请求,每次都会带上当前消费组的 generation 值,也就是纪元值,要是服务端 rebalance 已经完成了,纪元值 +1,那么此时就会发现自己没匹配上,然后紧接着就去设置自己的 RejoinNeeded 的标识,在下一轮 poll 的时候就会去开启 rebalance。
如果说是 rebalance 还没完成,那就更简单了,发现 group 的状态不是 stable,直接就返回对应的错误,然后设置标识,加入到 rebalance 过程中。
服务端源码:

 case Some(group) = 
 group.inLock {
 group.currentState match {
 case Dead = 
 // if the group is marked as dead, it means some other thread has just removed the group
 // from the coordinator metadata; this is likely that the group has migrated to some other
 // coordinator OR the group is in a transient unstable phase. Let the member retry
 // joining without the specified member id,
 responseCallback(Errors.UNKNOWN_MEMBER_ID)
 case Empty = 
 responseCallback(Errors.UNKNOWN_MEMBER_ID)
 case CompletingRebalance = 
 if (!group.has(memberId))
 responseCallback(Errors.UNKNOWN_MEMBER_ID)
 else
 responseCallback(Errors.REBALANCE_IN_PROGRESS)
 case PreparingRebalance = 
 if (!group.has(memberId)) { responseCallback(Errors.UNKNOWN_MEMBER_ID)
 } else if (generationId != group.generationId) { responseCallback(Errors.ILLEGAL_GENERATION)
 } else { val member = group.get(memberId)
 completeAndScheduleNextHeartbeatExpiration(group, member)
 responseCallback(Errors.REBALANCE_IN_PROGRESS)
 }
 case Stable = 
 if (!group.has(memberId)) { responseCallback(Errors.UNKNOWN_MEMBER_ID)
 //  纪元切换
 } else if (generationId != group.generationId) { responseCallback(Errors.ILLEGAL_GENERATION)
 } else { val member = group.get(memberId)
 //  完成上次的延时,新建新的延时任务
 completeAndScheduleNextHeartbeatExpiration(group, member)
 //  回调响应
 responseCallback(Errors.NONE)
 }

客户端源码:

 private class HeartbeatResponseHandler extends CoordinatorResponseHandler HeartbeatResponse, Void  {
 @Override
 public void handle(HeartbeatResponse heartbeatResponse, RequestFuture Void  future) { sensors.heartbeatLatency.record(response.requestLatencyMs());
 Errors error = heartbeatResponse.error();
 if (error == Errors.NONE) {
 log.debug( Received successful Heartbeat response 
 future.complete(null);
 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
 || error == Errors.NOT_COORDINATOR) { log.debug( Attempt to heartbeat since coordinator {} is either not started or not valid. ,
 coordinator());
 markCoordinatorUnknown();
 future.raise(error);
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.debug( Attempt to heartbeat failed since group is rebalancing 
 requestRejoin();
 future.raise(Errors.REBALANCE_IN_PROGRESS);
 } else if (error == Errors.ILLEGAL_GENERATION) { log.debug( Attempt to heartbeat failed since generation {} is not current , generation.generationId);
 resetGeneration();
 future.raise(Errors.ILLEGAL_GENERATION);
 } else if (error == Errors.UNKNOWN_MEMBER_ID) { log.debug( Attempt to heartbeat failed for since member id {} is not valid. , generation.memberId);
 resetGeneration();
 future.raise(Errors.UNKNOWN_MEMBER_ID);
 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId));
 } else { future.raise(new KafkaException( Unexpected error in heartbeat response:   + error.message()));
 }
 }
 }
 protected synchronized void requestRejoin() {
 this.rejoinNeeded = true;
 }

所以我们客户端看到这种异常,就知道怎么回事了,就是我在 rebalance 的过程中,或者已经完成了,客户端的纪元不对。

 REBALANCE_IN_PROGRESS(27,  The group is rebalancing, so a rejoin is needed. ,
 new ApiExceptionBuilder() {
 @Override
 public ApiException build(String message) { return new RebalanceInProgressException(message);
 }
 }),
 ILLEGAL_GENERATION(22,  Specified group generation id is not valid. ,
 new ApiExceptionBuilder() {
 @Override
 public ApiException build(String message) { return new IllegalGenerationException(message);
 }
 }),

我们再看 OFFSET_COMMIT 请求,其实和 HEARTBEAT 请求是基本一致的。
服务端:

 group.inLock { if (group.is(Dead)) { responseCallback(offsetMetadata.mapValues(_ =  Errors.UNKNOWN_MEMBER_ID))
 } else if ((generationId   0   group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
 // The group is only using Kafka to store offsets.
 // Also, for transactional offset commits we don t need to validate group membership and the generation.
 groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
 } else if (group.is(CompletingRebalance)) { responseCallback(offsetMetadata.mapValues(_ =  Errors.REBALANCE_IN_PROGRESS))
 } else if (!group.has(memberId)) { responseCallback(offsetMetadata.mapValues(_ =  Errors.UNKNOWN_MEMBER_ID))
 } else if (generationId != group.generationId) { responseCallback(offsetMetadata.mapValues(_ =  Errors.ILLEGAL_GENERATION))
 } else { val member = group.get(memberId)
 completeAndScheduleNextHeartbeatExpiration(group, member)
 groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
 }
 }
 }

客户端:

else if (error == Errors.UNKNOWN_MEMBER_ID
 || error == Errors.ILLEGAL_GENERATION
 || error == Errors.REBALANCE_IN_PROGRESS) {
 // need to re-join group
 resetGeneration();
 future.raise(new CommitFailedException());
 return;
 /**
 * Reset the generation and memberId because we have fallen out of the group.
 */
 protected synchronized void resetGeneration() {
 this.generation = Generation.NO_GENERATION;
 this.rejoinNeeded = true;
 this.state = MemberState.UNJOINED;
 }

从源码我们可以看到,客户端在感知 rebalance 主要通过两个机制,一个是状态,一个是纪元;状态生效于 rebalance 过程中,纪元生效于 rebalance 的 JOIN 阶段结束后。
与 coordinator 交互的这两个请求都会带上自己的纪元信息,在服务端处理前都会校验一下状态已经纪元信息,一旦不对,就告诉客户端你需要 rebalance 了。

十、线上如何减小 rebalance 的影响?

首先明确下,rebalance 会有什么影响?引用 JVM 的术语来说,就是 STOP THE WORLD。
一旦开启 rebalance 过程,在消费者进入 JOIN 阶段后就无法再继续消费,就是整个 group 的成员全部 STW,所以对业务的影响还是很大的。

“KAFKA 中 rebalance 是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-25发表,共计25687字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)