共计 3821 个字符,预计需要花费 10 分钟才能阅读完成。
如何实现 Kafka 精确传递一次语义,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
我们都知道 Kafka 的吞吐量很大,但是 Kafka 究竟会不会丢失消息呢?又会不会重复消费消息呢?
有很多公司因为业务要求必须保证消息不丢失、不重复的到达,比如无人机实时监控系统,当无人机闯入机场区域,我们必须立刻报警,不允许消息丢失。而无人机离开禁飞区域后我们需要将及时报警解除。如果消息重复了呢,我们是否需要复杂的逻辑来自己处理消息重复的情况呢,这种情况恐怕相当复杂而难以处理。但是如果我们能保证消息 exactly once,那么一切都容易得多。
下面我们来简单了解一下消息传递语义,以及 kafka 的消息传递机制。
首先我们要了解的是 message delivery semantic 也就是消息传递语义。
这是一个通用的概念,也就是消息传递过程中消息传递的保证性。
分为三种:
最多一次(at most once): 消息可能丢失也可能被处理,但最多只会被处理一次。
可能丢失 不会重复
至少一次(at least once): 消息不会丢失,但可能被处理多次。
可能重复 不会丢失
精确传递一次(exactly once): 消息被处理且只会被处理一次。
不丢失 不重复 就一次
而 kafka 其实有两次消息传递,一次生产者发送消息给 kafka,一次消费者去 kafka 消费消息。
两次传递都会影响最终结果,
两次都是精确一次,最终结果才是精确一次。
两次中有一次会丢失消息,或者有一次会重复,那么最终的结果就是可能丢失或者重复的。
一、Produce 端消息传递
这是 producer 端的代码:
Properties properties = new Properties();
properties.put(bootstrap.servers , kafka01:9092,kafka02:9092
properties.put(acks , all
properties.put(retries , 0);
properties.put(batch.size , 16384);
properties.put(linger.ms , 1);
properties.put(buffer.memory , 33554432);
properties.put(key.serializer , org.apache.kafka.common.serialization.StringSerializer
properties.put(value.serializer , org.apache.kafka.common.serialization.StringSerializer
KafkaProducer String, String kafkaProducer = new KafkaProducer String, String (properties);
for (int i = 1; i = 600; i++) {
kafkaProducer.send(new ProducerRecord String, String ( z_test_20190430 , testkafka0613 +i));
System.out.println(testkafka +i);
}
kafkaProducer.close();
其中指定了一个参数 acks 可以有三个值选择:
0:producer 完全不管 broker 的处理结果 回调也就没有用了 并不能保证消息成功发送 但是这种吞吐量最高
all 或者 -1:leader broker 会等消息写入 并且 ISR 都写入后 才会响应,这种只要 ISR 有副本存活就肯定不会丢失,但吞吐量最低。
1:默认的值 leader broker 自己写入后就响应,不会等待 ISR 其他的副本写入,只要 leader broker 存活就不会丢失,即保证了不丢失,也保证了吞吐量。
所以设置为 0 时,实现了 at most once,而且从这边看只要保证集群稳定的情况下,不设置为 0,消息不会丢失。
但是还有一种情况就是消息成功写入,而这个时候由于网络问题 producer 没有收到写入成功的响应,producer 就会开启重试的操作,直到网络恢复,消息就发送了多次。这就是 at least once 了。
kafka producer 的参数 acks 的默认值为 1,所以默认的 producer 级别是 at least once。并不能 exactly once。
二、Consumer 端消息传递
consumer 是靠 offset 保证消息传递的。
consumer 消费的代码如下:
Properties props = new Properties();
props.put(bootstrap.servers , kafka01:9092,kafka02:9092
props.put(group.id , test
props.put(enable.auto.commit , true
props.put(auto.commit.interval.ms , 1000
props.put(key.deserializer , org.apache.kafka.common.serialization.StringDeserializer
props.put(value.deserializer , org.apache.kafka.common.serialization.StringDeserializer
props.put(auto.offset.reset , earliest
KafkaConsumer String, String consumer = new KafkaConsumer (props);
consumer.subscribe(Arrays.asList( foo , bar));
try{
while (true) {
ConsumerRecords String, String records = consumer.poll(1000);
for (ConsumerRecord String, String record : records) {
System.out.printf(offset = %d, key = %s, value = %s%n , record.offset(), record.key(), record.value());
}
}
}finally{
consumer.close();
}
其中有一个参数是 enable.auto.commit
若设置为 true consumer 在消费之前提交位移 就实现了 at most once
若是消费后提交 就实现了 at least once 默认的配置就是这个。
kafka consumer 的参数 enable.auto.commit 的默认值为 true ,所以默认的 consumer 级别是 at least once。也并不能 exactly once。
三、精确一次
通过了解 producer 端与 consumer 端的设置,我们发现 kafka 在两端的默认配置都是 at least once,肯能重复,通过配置的话呢也不能做到 exactly once,好像 kafka 的消息一定会丢失或者重复的,是不是没有办法做到 exactly once 了呢?
确实在 kafka 0.11.0.0 版本之前 producer 端确实是不可能的,但是在 kafka 0.11.0.0 版本之后,kafka 正式推出了 idempotent producer。
也就是幂等的 producer 还有对事务的支持。
幂等的 producer
kafka 0.11.0.0 版本引入了 idempotent producer 机制,在这个机制中同一消息可能被 producer 发送多次,但是在 broker 端只会写入一次,他为每一条消息编号去重,而且对 kafka 开销影响不大。
如何设置开启呢? 需要设置 producer 端的新参数 enable.idempotent 为 true。
而多分区的情况,我们需要保证原子性的写入多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚。
这时候就需要使用事务,在 producer 端设置 transcational.id 为一个指定字符串。
这样幂等 producer 只能保证单分区上无重复消息;事务可以保证多分区写入消息的完整性。
这样 producer 端实现了 exactly once,那么 consumer 端呢?
consumer 端由于可能无法消费事务中所有消息,并且消息可能被删除,所以事务并不能解决 consumer 端 exactly once 的问题,我们可能还是需要自己处理这方面的逻辑。比如自己管理 offset 的提交,不要自动提交,也是可以实现 exactly once 的。
还有一个选择就是使用 kafka 自己的流处理引擎,也就是 Kafka Streams,
设置 processing.guarantee=exactly_once,就可以轻松实现 exactly once 了。
看完上述内容,你们掌握如何实现 Kafka 精确传递一次语义的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注丸趣 TV 行业资讯频道,感谢各位的阅读!