共计 2749 个字符,预计需要花费 7 分钟才能阅读完成。
本篇内容主要讲解“Kafka Consumer 使用要注意什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“Kafka Consumer 使用要注意什么”吧!
一、特点:
不用关心 offset, 会自动的读 zookeeper 中该 Consumer group 的 last offset
二、注意事项
1. 如果 consumer 比 partition 多,是浪费,因为 kafka 的设计是在一个 partition 上是不允许并发的,
所以 consumer 数不要大于 partition 数
2. 如果 consumer 比 partition 少,一个 consumer 会对应于多个 partitions,
这里主要合理分配 consumer 数和 partition 数,否则会导致 partition 里面的数据被取的不均匀
最好 partiton 数目是 consumer 数目的整数倍,所以 partition 数目很重要,
比如取 24,就很容易设定 consumer 数目
3. 如果 consumer 从多个 partition 读到数据,不保证数据间的顺序性,
kafka 只保证在一个 partition 上数据是有序的,但多个 partition,根据你读的顺序会有不同
4. 增减 consumer,broker,partition 会导致 rebalance,
所以 rebalance 后 consumer 对应的 partition 会发生变化
5. High-level 接口中获取不到数据的时候是会 block 的
三、代码如下:
package kafkatest.kakfademo;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerDemo1 {
public static void main(String[] args) {
ConsumerDemo1 demo = new ConsumerDemo1();
demo.test();
}
@SuppressWarnings(rawtypes)
public void test() {
String topicName = test
int numThreads = 1;
Properties properties = new Properties();
properties.put(zookeeper.connect , hadoop0:2181 // 声明 zk
properties.put(group.id , group–demo // 必须要使用别的组名称,
// 如果生产者和消费者都在同一组,则不能访问同一组内的 topic 数据
ConsumerConnector consumer = Consumer
.createJavaConsumerConnector(new ConsumerConfig(properties));
Map String, Integer topicCountMap = new HashMap String, Integer
topicCountMap.put(topicName, numThreads); // 一次从主题中获取一个数据
Map String, List KafkaStream byte[], byte[] messageStreams = consumer
.createMessageStreams(topicCountMap);
// 获取每次接收到的这个数据
List KafkaStream byte[], byte[] streams = messageStreams
.get(topicName);
// now launch all the threads
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.execute(new ConsumerMsgTask(stream, threadNumber));
threadNumber++;
}
}
class ConsumerMsgTask implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
m_threadNumber = threadNumber;
m_stream = stream;
}
public void run() {
ConsumerIterator byte[], byte[] it = m_stream.iterator();
long offset = 0;
try {
while (it.hasNext())
offset = it.next().offset();
byte[] bytes = it.next().message();
String msg = new String(bytes, UTF-8
System.out.print(offset: + offset + ,msg: + msg);
System.out.println(Shutting down Thread: + m_threadNumber);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
四、实验验证
到此,相信大家对“Kafka Consumer 使用要注意什么”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!