kafka consumer怎么使用

64次阅读
没有评论

共计 4495 个字符,预计需要花费 12 分钟才能阅读完成。

这篇文章主要介绍“kafka consumer 怎么使用”,在日常操作中,相信很多人在 kafka consumer 怎么使用问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”kafka consumer 怎么使用”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

consumer 作为 kafka 当中一个重要元素,它的常用操作并不复杂,说白了无非就是 2 点,1、把数据 poll 出来,2、把位置标记上。我们找到 kafka 的 java api doc,找到了官方提供的几种 consumer 操作的例子,逐一进行分析,看看都有几种操作类型。

Automatic Offset Committing

自动 Offset 提交

这个例子显示了一个基于 offset 自动提交的 consumer api 的简单应用。

Properties props = new Properties();
 props.put( bootstrap.servers ,  localhost:9092 
 props.put( group.id ,  test 
 props.put( enable.auto.commit ,  true 
 props.put( auto.commit.interval.ms ,  1000 
 props.put( session.timeout.ms ,  30000 
 props.put( key.deserializer ,  org.apache.kafka.common.serialization.StringDeserializer 
 props.put( value.deserializer ,  org.apache.kafka.common.serialization.StringDeserializer 
 KafkaConsumer String, String  consumer = new KafkaConsumer (props);
 consumer.subscribe(Arrays.asList( foo ,  bar));
 while (true) { ConsumerRecords String, String  records = consumer.poll(100);
 for (ConsumerRecord String, String  record : records)
 System.out.printf(offset = %d, key = %s, value = %s , record.offset(), record.key(), record.value());
 }

enable.auto.commit 意味着 offset 将会得到自动提交,而这个自动提交的时间间隔由 auto.commit.interval.ms 来进行控制。

客户端通过 bootstrap.servers 的配置来连接服务器,这个配值当中可以是一个或多个 broker,需要注意的是,这个配置仅仅用来让客户端找到我们的 server 集群,而不需要把集群当中的所有服务器地址都列上。

在这个例子当中,客户端作为 test group 的一员,订阅了 foo 和 bar2 个 topic。

(这一段直接翻译很蹩脚,我会试着根据自己的理解翻译出来)首先假设,foo 和 bar 这 2 个 topic,都分别有 3 个 partitions,同时我们将上面的代码在我们的机器上起 3 个进程,也就是说,在 test group 当中,目前有了 3 个 consumer,一般来讲,这 3 个 consumer 会分别获得 foo 和 bar 的各一个 partitions,这是前提。3 个 consumer 会周期性的执行一个 poll 的动作(这个动作当中隐含的有一个 heartbeat 的发送,来告诉 cluster 我是活的),这样 3 个 consumer 会持续的保有他们对分配给自己的 partition 的访问的权利,如果某一个 consumer 失效了,也就是 poll 不再执行了,cluster 会在一段时间(session.timeout.ms)之后把 partitions 分配给其他的 consumer。

反序列化的设置,定义了如何转化 bytes,这里我们把 key 和 value 都直接转化为 string。

Manual Offset Control

手动的 offset 控制

除了周期性的自动提交 offset 之外,用户也可以在消息被消费了之后提交他们的 offset。

某些情况下,消息的消费是和某些处理逻辑相关联的,我们可以用这样的方式,手动的在处理逻辑结束之后提交 offset。

简要地说,在这个例子当中,我们希望每次至少消费 200 条消息并将它们插入数据库,之后再提交 offset。如果仍然使用前面的自动提交方式,就可能出现消息已经被消费,但是插入数据库失败的情况。这里可以视作一个简单的事务封装。

但是,有没有另一种可能性,在插入数据库成功之后,提交 offset 之前,发生了错误,或者说是提交 offset 本身发生了错误,那么就可能出现某些消息被重复消费的情况。

个人认为这段话说的莫名其妙,简单地说,采用这样的方式,消息不会被丢失,但是有可能出现重复消费。

Properties props = new Properties();
 props.put( bootstrap.servers ,  localhost:9092 
 props.put( group.id ,  test 
 props.put( enable.auto.commit ,  false 
 props.put( auto.commit.interval.ms ,  1000 
 props.put( session.timeout.ms ,  30000 
 props.put( key.deserializer ,  org.apache.kafka.common.serialization.StringDeserializer 
 props.put( value.deserializer ,  org.apache.kafka.common.serialization.StringDeserializer 
 KafkaConsumer String, String  consumer = new KafkaConsumer (props);
 consumer.subscribe(Arrays.asList( foo ,  bar));
 final int minBatchSize = 200;
 List ConsumerRecord String, String  buffer = new ArrayList ();
 while (true) { ConsumerRecords String, String  records = consumer.poll(100);
 for (ConsumerRecord String, String  record : records) { buffer.add(record);
 }
 if (buffer.size()  = minBatchSize) { insertIntoDb(buffer);
 consumer.commitSync();
 buffer.clear();
 }
 }

上面的例子当中,我们用 commitSync 来标记所有的消息;在有些情况下,我们可能希望更加精确的控制 offset,那么在下面的例子当中,我们可以在每一个 partition 当中分别控制 offset 的提交。

try { while(running) { ConsumerRecords String, String  records = consumer.poll(Long.MAX_VALUE);
 for (TopicPartition partition : records.partitions()) { List ConsumerRecord String, String  partitionRecords = records.records(partition);
 for (ConsumerRecord String, String  record : partitionRecords) { System.out.println(record.offset() +  :   + record.value());
 }
 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
 }
 }
 } finally { consumer.close();
 }

注意:提交的 offset 应该是 next message,所以,提交的时候需要在当前最后一条的基础上 +1.

Manual Partition Assignment

手动的分区分配

前面的例子当中,我们订阅一个 topic,然后让 kafka 把该 topic 当中的不同 partitions,公平的在一个 consumer group 内部进行分配。那么,在某些情况下,我们希望能够具体的指定 partitions 的分配关系。

如果某个进程在本地管理了和 partition 相关的状态,那么它只需要获得跟他相关 partition。

如果某个进程自身具备高可用性,那么就不需要 kafka 来检测错误并重新分配 partition,因为消费者进程会在另一台设备上重新启动。

要使用这种模式,可以用 assign 方法来代替 subscribe,具体指定一个 partitions 列表。

String topic =  foo 
 TopicPartition partition0 = new TopicPartition(topic, 0);
 TopicPartition partition1 = new TopicPartition(topic, 1);
 consumer.assign(Arrays.asList(partition0, partition1));

分配之后,就可以像前面的例子一样,在循环当中调用 poll 来消费消息。手动的分区分配不需要组协调,所以消费进程失效之后,不会引发 partition 的重新分配,每一个消费者都是独立工作的,即使它和其他消费者属于同一个 group。为了避免 offset 提交的冲突,在这种情况下,通常我们需要保证每一个 consumer 使用自己的 group id。

需要注意的是,手动 partition 分配和通过 subscribe 实现的动态的分区分配,2 种方式是不能混合使用的。

到此,关于“kafka consumer 怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计4495字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)