共计 10698 个字符,预计需要花费 27 分钟才能阅读完成。
Kafka 消费与心跳机制如何理解,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
导读 kafka 是一个分布式,分区的,多副本的,多订阅者的消息发布订阅系统(分布式 MQ 系统),可以用于搜索日志,监控日志,访问日志等。kafka 是一个分布式,分区的,多副本的,多订阅者的消息发布订阅系统(分布式 MQ 系统),可以用于搜索日志,监控日志,访问日志等。今天丸趣 TV 小编来领大家一起来学习一下 Kafka 消费与心跳机制。
1、Kafka 消费
首先,我们来看看消费。Kafka 提供了非常简单的消费 API,使用者只需初始化 Kafka 的 Broker Server 地址,然后实例化 KafkaConsumer 类即可拿到 Topic 中的数据。一个简单的 Kafka 消费实例代码如下所示:
public class JConsumerSubscribe extends Thread { public static void main(String[] args) { JConsumerSubscribe jconsumer = new JConsumerSubscribe(); jconsumer.start(); } /** 初始化 Kafka 集群信息. */ private Properties configure() { Properties props = new Properties(); props.put(bootstrap.servers , dn1:9092,dn2:9092,dn3:9092 // 指定 Kafka 集群地址 props.put( group.id , ke // 指定消费者组 props.put( enable.auto.commit , true // 开启自动提交 props.put( auto.commit.interval.ms , 1000 // 自动提交的时间间隔 // 反序列化消息主键 props.put( key.deserializer , org.apache.kafka.common.serialization.StringDeserializer // 反序列化消费记录 props.put( value.deserializer , org.apache.kafka.common.serialization.StringDeserializer return props; } /** 实现一个单线程消费者. */ @Override public void run() { // 创建一个消费者实例对象 KafkaConsumer String, String consumer = new KafkaConsumer (configure()); // 订阅消费主题集合 consumer.subscribe(Arrays.asList( test_kafka_topic)); // 实时消费标识 boolean flag = true; while (flag) { // 获取主题消息数据 ConsumerRecords String, String records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord String, String record : records) // 循环打印消息记录 System.out.printf(offset = %d, key = %s, value = %s%n , record.offset(), record.key(), record.value()); } // 出现异常关闭消费者对象 consumer.close(); }}
上述代码我们就可以非常便捷地拿到 Topic 中的数据。但是,当我们调用 poll 方法拉取数据的时候,Kafka Broker Server 做了那些事情。接下来,我们可以去看看源代码的实现细节。核心代码如下:
org.apache.kafka.clients.consumer.KafkaConsumer
private ConsumerRecords K, V poll(final long timeoutMs, final boolean includeMetadataInTimeout) { acquireAndEnsureOpen(); try { if (timeoutMs 0) throw new IllegalArgumentException(Timeout must not be negative if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { throw new IllegalStateException( Consumer is not subscribed to any topics or assigned any partitions } // poll for new data until the timeout expires long elapsedTime = 0L; do { client.maybeTriggerWakeup(); final long metadataEnd; if (includeMetadataInTimeout) { final long metadataStart = time.milliseconds(); if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { return ConsumerRecords.empty(); } metadataEnd = time.milliseconds(); elapsedTime += metadataEnd - metadataStart; } else { while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) { log.warn( Still waiting for metadata } metadataEnd = time.milliseconds(); } final Map TopicPartition, List ConsumerRecord K, V records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime)); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() 0 || client.hasPendingRequests()) { client.pollNoWakeup(); } return this.interceptors.onConsume(new ConsumerRecords (records)); } final long fetchEnd = time.milliseconds(); elapsedTime += fetchEnd - metadataEnd; } while (elapsedTime timeoutMs); return ConsumerRecords.empty(); } finally { release(); } }
上述代码中有个方法 pollForFetches,它的实现逻辑如下:
private Map TopicPartition, List ConsumerRecord K, V pollForFetches(final long timeoutMs) { final long startMs = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs); // if data is available already, return it immediately final Map TopicPartition, List ConsumerRecord K, V records = fetcher.fetchedRecords(); if (!records.isEmpty()) { return records; } // send any new fetches (won t resend pending fetches) fetcher.sendFetches(); // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call // updateAssignmentMetadataIfNeeded before this method. if (!cachedSubscriptionHashAllFetchPositions pollTimeout retryBackoffMs) { pollTimeout = retryBackoffMs; } client.poll(pollTimeout, startMs, () - { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.rejoinNeededOrPending()) { return Collections.emptyMap(); } return fetcher.fetchedRecords(); }
上述代码中加粗的位置,我们可以看出每次消费者客户端拉取数据时,通过 poll 方法,先调用 fetcher 中的 fetchedRecords 函数,如果获取不到数据,就会发起一个新的 sendFetches 请求。而在消费数据的时候,每个批次从 Kafka Broker Server 中拉取数据是有最大数据量限制,默认是 500 条,由属性 (max.poll.records) 控制,可以在客户端中设置该属性值来调整我们消费时每次拉取数据的量。
提示:这里需要注意的是,max.poll.records 返回的是一个 poll 请求的数据总和,与多少个分区无关。因此,每次消费从所有分区中拉取 Topic 的数据的总条数不会超过 max.poll.records 所设置的值。
而在 Fetcher 的类中,在 sendFetches 方法中有限制拉取数据容量的限制,由属性(max.partition.fetch.bytes),默认 1MB。可能会有这样一个场景,当满足 max.partition.fetch.bytes 限制条件,如果需要 Fetch 出 10000 条记录,每次默认 500 条,那么我们需要执行 20 次才能将这一次通过网络发起的请求全部 Fetch 完毕。
这里,可能有同学有疑问,我们不能将默认的 max.poll.records 属性值调到 10000 吗? 可以调,但是还有个属性需要一起配合才可以,这个就是每次 poll 的超时时间(Duration.ofMillis(100)),这里需要根据你的实际每条数据的容量大小来确定设置超时时间,如果你将最大值调到 10000,当你每条记录的容量很大时,超时时间还是 100ms,那么可能拉取的数据少于 10000 条。
而这里,还有另外一个需要注意的事情,就是会话超时的问题。session.timeout.ms 默认是 10s,group.min.session.timeout.ms 默认是 6s,group.max.session.timeout.ms 默认是 30min。当你在处理消费的业务逻辑的时候,如果在 10s 内没有处理完,那么消费者客户端就会与 Kafka Broker Server 断开,消费掉的数据,产生的 offset 就没法提交给 Kafka,因为 Kafka Broker Server 此时认为该消费者程序已经断开,而即使你设置了自动提交属性,或者设置 auto.offset.reset 属性,你消费的时候还是会出现重复消费的情况,这就是因为 session.timeout.ms 超时的原因导致的。
2、心跳机制
上面在末尾的时候,说到会话超时的情况导致消息重复消费,为什么会有超时? 有同学会有这样的疑问,我的消费者线程明明是启动的,也没有退出,为啥消费不到 Kafka 的消息呢? 消费者组也查不到我的 ConsumerGroupID 呢? 这就有可能是超时导致的,而 Kafka 是通过心跳机制来控制超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。
在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator 中会启动一个 HeartbeatThread 线程来定时发送心跳和检测消费者的状态。每个消费者都有个 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,而每个 ConsumerCoordinator 都会启动一个 HeartbeatThread 线程来维护心跳,心跳信息存放在 org.apache.kafka.clients.consumer.internals.Heartbeat 中,声明的 Schema 如下所示:
private final int sessionTimeoutMs; private final int heartbeatIntervalMs; private final int maxPollIntervalMs; private final long retryBackoffMs; private volatile long lastHeartbeatSend; private long lastHeartbeatReceive; private long lastSessionReset; private long lastPoll; private boolean heartbeatFailed;
心跳线程中的 run 方法实现代码如下:
public void run() { try { log.debug( Heartbeat thread started while (true) { synchronized (AbstractCoordinator.this) { if (closed) return; if (!enabled) { AbstractCoordinator.this.wait(); continue; } if (state != MemberState.STABLE) { // the group is not stable (perhaps because we left the group or because the coordinator // kicked us out), so disable heartbeats and wait for the main thread to rejoin. disable(); continue; } client.pollNoWakeup(); long now = time.milliseconds(); if (coordinatorUnknown()) { if (findCoordinatorFuture != null || lookupCoordinator().failed()) // the immediate future check ensures that we backoff properly in the case that no // brokers are available to connect to. AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should // probably make sure the coordinator is still healthy. markCoordinatorUnknown(); } else if (heartbeat.pollTimeoutExpired(now)) { // the poll timeout has expired, which means that the foreground thread has stalled // in between calls to poll(), so we explicitly leave the group. maybeLeaveGroup(); } else if (!heartbeat.shouldHeartbeat(now)) { // poll again after waiting for the retry backoff in case the heartbeat failed or the // coordinator disconnected AbstractCoordinator.this.wait(retryBackoffMs); } else { heartbeat.sentHeartbeat(now); sendHeartbeatRequest().addListener(new RequestFutureListener() { @Override public void onSuccess(Void value) { synchronized (AbstractCoordinator.this) { heartbeat.receiveHeartbeat(time.milliseconds()); } } @Override public void onFailure(RuntimeException e) { synchronized (AbstractCoordinator.this) { if (e instanceof RebalanceInProgressException) { // it is valid to continue heartbeating while the group is rebalancing. This // ensures that the coordinator keeps the member in the group for as long // as the duration of the rebalance timeout. If we stop sending heartbeats, // however, then the session timeout may expire before we can rejoin. heartbeat.receiveHeartbeat(time.milliseconds()); } else { heartbeat.failHeartbeat(); // wake up the thread if it s sleeping to reschedule the heartbeat AbstractCoordinator.this.notify(); } } } }); } } } } catch (AuthenticationException e) { log.error( An authentication error occurred in the heartbeat thread , e); this.failed.set(e); } catch (GroupAuthorizationException e) { log.error( A group authorization error occurred in the heartbeat thread , e); this.failed.set(e); } catch (InterruptedException | InterruptException e) { Thread.interrupted(); log.error(Unexpected interrupt received in heartbeat thread , e); this.failed.set(new RuntimeException(e)); } catch (Throwable e) { log.error( Heartbeat thread failed due to unexpected error , e); if (e instanceof RuntimeException) this.failed.set((RuntimeException) e); else this.failed.set(new RuntimeException(e)); } finally { log.debug( Heartbeat thread has closed } }
在心跳线程中这里面包含两个最重要的超时函数,它们是 sessionTimeoutExpired 和 pollTimeoutExpired。
public boolean sessionTimeoutExpired(long now) { return now - Math.max(lastSessionReset, lastHeartbeatReceive) sessionTimeoutMs; }public boolean pollTimeoutExpired(long now) { return now - lastPoll maxPollIntervalMs; }
2.1、sessionTimeoutExpired
如果是 sessionTimeout 超时,则会被标记为当前协调器处理断开,此时,会将消费者移除,重新分配分区和消费者的对应关系。在 Kafka Broker Server 中,Consumer Group 定义了 5 种 (如果算上 Unknown,应该是 6 种状态) 状态,org.apache.kafka.common.ConsumerGroupState,如下图所示:
2.2、pollTimeoutExpired
如果触发了 poll 超时,此时消费者客户端会退出 ConsumerGroup,当再次 poll 的时候,会重新加入到 ConsumerGroup,触发 RebalanceGroup。而 KafkaConsumer Client 是不会帮我们重复 poll 的,需要我们自己在实现的消费逻辑中不停地调用 poll 方法。
3. 分区与消费线程
关于消费分区与消费线程的对应关系,理论上消费线程数应该小于等于分区数。之前是有这样一种观点,一个消费线程对应一个分区,当消费线程等于分区数是最大化线程的利用率。直接使用 KafkaConsumer Client 实例,这样使用确实没有什么问题。但是,如果我们有富裕的 CPU,其实还可以使用大于分区数的线程,来提升消费能力,这就需要我们对 KafkaConsumer Client 实例进行改造,实现消费策略预计算,利用额外的 CPU 开启更多的线程,来实现消费任务分片。
看完上述内容,你们掌握 Kafka 消费与心跳机制如何理解的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注丸趣 TV 行业资讯频道,感谢各位的阅读!