Storm

73次阅读
没有评论

共计 4003 个字符,预计需要花费 11 分钟才能阅读完成。

Storm-kafka 中如何理解 ZkCoordinator 的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

梳理 ZkCoordinator 的过程

package com.mixbox.storm.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
import java.util.*;
import static com.mixbox.storm.kafka.KafkaUtils.taskId;
 * 
 * 
 * ZKCoordinator  协调器
 * 
 * @author Yin Shuai
 */
public class ZkCoordinator implements PartitionCoordinator {
 public static final Logger LOG = LoggerFactory
 .getLogger(ZkCoordinator.class);
 SpoutConfig _spoutConfig;
 int _taskIndex;
 int _totalTasks;
 String _topologyInstanceId;
 //  每一个分区对应着一个分区管理器
 Map Partition, PartitionManager  _managers = new HashMap();
 // 缓存的 List
 List PartitionManager  _cachedList;
 // 上次刷新的时间
 Long _lastRefreshTime = null;
 // 刷新频率   毫秒
 int _refreshFreqMs;
 // 动态分区连接
 DynamicPartitionConnections _connections;
 // 动态 BrokersReader
 DynamicBrokersReader _reader;

public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; _reader = reader;  * @param stormConf  * @param spoutConfig  * @return  */ private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) {ZkHosts hosts = (ZkHosts) spoutConfig.hosts; return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic); @Override public List PartitionManager  getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime)   _refreshFreqMs) {refresh(); _lastRefreshTime = System.currentTimeMillis(); return _cachedList;  *  简单的刷新的行为  *   */ void refresh() { try {LOG.info(taskId(_taskIndex, _totalTasks) +  Refreshing partition manager connections //  拿到所有的分区信息 GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo(); //  拿到自己任务的所有分区 List Partition  mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex); //  拿到当前任务的分区 Set Partition  curr = _managers.keySet(); //  构造一个集合 Set Partition  newPartitions = new HashSet Partition (mine); //  在 new 分区中,移除掉所有   自己拥有的分区 newPartitions.removeAll(curr); //  要删除的分区 Set Partition  deletedPartitions = new HashSet Partition (curr); deletedPartitions.removeAll(mine); LOG.info(taskId(_taskIndex, _totalTasks) +  Deleted partition managers:  + deletedPartitions.toString()); for (Partition id : deletedPartitions) {PartitionManager man = _managers.remove(id); man.close(); LOG.info(taskId(_taskIndex, _totalTasks) +  New partition managers:   + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); } catch (Exception e) {throw new RuntimeException(e); _cachedList = new ArrayList PartitionManager (_managers.values()); LOG.info(taskId(_taskIndex, _totalTasks) +  Finished refreshing @Override public PartitionManager getManager(Partition partition) {return _managers.get(partition); }

   1:首先 ZKCoorDinator 实现  PartitionCoordinator 的接口

package com.mixbox.storm.kafka;
import java.util.List;
 * @author Yin Shuai
 */
public interface PartitionCoordinator { *  拿到我管理的分区列表  List{PartitionManager}
  * @return
  */
 List PartitionManager  getMyManagedPartitions();

PartitionManager getManager(Partition partition); }

          第一个方法拿到所有的   PartitionManager

          第二个方法依据特定的   Partition 去得到一个分区管理器

关于 Storm-kafka 中如何理解 ZkCoordinator 的过程问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注丸趣 TV 行业资讯频道了解更多相关知识。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-17发表,共计4003字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)