共计 2360 个字符,预计需要花费 6 分钟才能阅读完成。
这篇文章主要介绍 Storm-kafka 中如何封装 DynamicBrokerReader 类,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
在细节上把握 DynamicBrokerReder 的封装类 – ZkBrokerReader
package com.mixbox.storm.kafka.trident;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.DynamicBrokersReader;
import com.mixbox.storm.kafka.ZkHosts;
import java.util.Map;
* 2014/07/22
* 在 ZK 中间拿到 GlobalPartitionInformation
*
* ZkBrokerReader 是对于 DynamicBrokersReader 的一个简单的封装
* @author Yin Shuai
*/
public class ZkBrokerReader implements IBrokerReader {
public static final Logger LOG = LoggerFactory
.getLogger(ZkBrokerReader.class);
GlobalPartitionInformation cachedBrokers;
public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
reader = new DynamicBrokersReader(conf, hosts.brokerZkStr,
hosts.brokerZkPath, topic);
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = System.currentTimeMillis();
refreshMillis = hosts.refreshFreqSecs * 1000L;
@Override
public GlobalPartitionInformation getCurrentBrokers() {long currTime = System.currentTimeMillis();
// 很简单, 指定了你多长时间开始去刷新 Brokerlibiao
if (currTime lastRefreshTimeMs + refreshMillis) {
LOG.info( brokers need refreshing because + refreshMillis
+ ms have expired
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = currTime;
return cachedBrokers;
@Override
public void close() {reader.close();
}
总览我们的 Code:
ZkBrokerReader 是对于 DynamicBrokersReader 的一个简单封装,ZkBrokerReader 之中持有 2 个主要的 Class
1 GlobalPartitionInformatio cachedBroker;
2 DynamicBrokersReader reader;
3 long lastRefreshTimeMs; 最新的刷新时间
lastRefreshTimeMs = System.currentTimeMillis(); 最新的刷新时间为系统的当前时间
4 long refreshMillis
refreshMillis = host.refreshFreqSecs * 1000L 设定刷新的毫秒数为
5
public GlobalPartitionInformation getCurrentBrokers() {long currTime = System.currentTimeMillis();
// 很简单, 指定了你多长时间开始去刷新 Brokerlibiao
if (currTime lastRefreshTimeMs + refreshMillis) {
LOG.info( brokers need refreshing because + refreshMillis
+ ms have expired
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = currTime;
return cachedBrokers;
}
每一次调用 getCurrentBrokers,首先会取 System.currentTimeMillis 当当前的系统时间超过了 最早的刷新时间 + 刷新
的间隔,就会再次的去跟新:
cachedBrokers = reader.getBrokerInfo();getBrokerInfo() 方法每调用一次,也就重新在 zk 之中重新去取
一次。
ZkBrokerReader 是对于 DynamicBrokerReader 的一个封装,DynamicBrokerReader 的 Dynamic 性质并不程序动态的因数,而只是简单在读取 ZK 数据的过程之中,Zk 数据已经动态的发生变化?
以上是“Storm-kafka 中如何封装 DynamicBrokerReader 类”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注丸趣 TV 行业资讯频道!