Storm如何和Kafka进行整合

64次阅读
没有评论

共计 6103 个字符,预计需要花费 16 分钟才能阅读完成。

这篇文章将为大家详细讲解有关 Storm 如何和 Kafka 进行整合,文章内容质量较高,因此丸趣 TV 小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

  对于 Storm 如何和 Kafka 进行整合

package com.mixbox.storm.kafka;
import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;
import java.util.*;
 * @author Yin Shuai
 */
public class KafkaSpout extends BaseRichSpout {public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
  *  内部类,Message 和 Offset 的偏移量对象
  * 
  * @author Yin Shuai
  */
 public static class MessageAndRealOffset {
 public Message msg;
 public long offset;
 public MessageAndRealOffset(Message msg, long offset) {
 this.msg = msg;
 this.offset = offset;
  *  发射的枚举类
  * @author Yin Shuai
  */
 static enum EmitState {
 EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED
 String _uuid = UUID.randomUUID().toString();
 SpoutConfig _spoutConfig;
 SpoutOutputCollector _collector;
 //  分区的协调器,getMyManagedPartitions  拿到我所管理的分区
 PartitionCoordinator _coordinator;
 //  动态的分区链接:保存到 kafka 各个节点的连接,以及负责的 topic 的 partition 号码
 DynamicPartitionConnections _connections;
 //  提供了从 zookeeper 读写 kafka  消费者信息的功能
 ZkState _state;
 //  上次更新的毫秒数
 long _lastUpdateMs = 0;
 //  当前的分区
 int _currPartitionIndex = 0;
 public KafkaSpout(SpoutConfig spoutConf) {
 _spoutConfig = spoutConf;
 @SuppressWarnings(unchecked)
 @Override
 public void open(Map conf, final TopologyContext context,
 final SpoutOutputCollector collector) {
 _collector = collector;
 List String  zkServers = _spoutConfig.zkServers;
 //  初始化的时候如果 zkServers  为空,那么初始化   默认的配置 Zookeeper
 if (zkServers == null) {zkServers = new ArrayList String () {
 add( 192.168.50.144 
 add( 192.168.50.169 
 add( 192.168.50.207 
 // zkServers =
 // (List String)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
 System.out.println(  使用的是 Storm 默认配置的 Zookeeper List :   + zkServers);
 Integer zkPort = _spoutConfig.zkPort;
 //  在这里我们也同时   来检查 zookeeper 的端口是否为空
 if (zkPort == null) {
 zkPort = 2181;
 // zkPort = ((Number)
 // conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
 Map stateConf = new HashMap(conf);
 stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
 stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
 stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
 //  通过保存的配置文件,我们持有了一个 zookeeper 的 state,支持节点内容的创建和删除
 _state = new ZkState(stateConf);
 //  对于连接的维护
 _connections = new DynamicPartitionConnections(_spoutConfig,
 KafkaUtils.makeBrokerReader(conf, _spoutConfig));
 // using TransactionalState like this is a hack
 //  拿到总共的任务次数
 int totalTasks = context
 .getComponentTasks(context.getThisComponentId()).size();
 //  判断当前的主机是否是静态的 statichost
 if (_spoutConfig.hosts instanceof StaticHosts) {
 _coordinator = new StaticCoordinator(_connections, conf,
 _spoutConfig, _state, context.getThisTaskIndex(),
 totalTasks, _uuid);
 //  当你拿到的 spoutConfig 是 zkhost 的时候
 } else {
 _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig,
 _state, context.getThisTaskIndex(), totalTasks, _uuid);
 context.registerMetric(kafkaOffset , new IMetric() {
 KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
 @Override
 public Object getValueAndReset() {
 List PartitionManager  pms = _coordinator
 .getMyManagedPartitions();
 Set Partition  latestPartitions = new HashSet();
 for (PartitionManager pm : pms) {latestPartitions.add(pm.getPartition());
 _kafkaOffsetMetric.refreshPartitions(latestPartitions);
 for (PartitionManager pm : pms) {
 _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
 return _kafkaOffsetMetric.getValueAndReset();}, _spoutConfig.metricsTimeBucketSizeInSecs);
 context.registerMetric(kafkaPartition , new IMetric() {
 @Override
 public Object getValueAndReset() {
 List PartitionManager  pms = _coordinator
 .getMyManagedPartitions();
 Map concatMetricsDataMaps = new HashMap();
 for (PartitionManager pm : pms) {concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
 return concatMetricsDataMaps;
 }, _spoutConfig.metricsTimeBucketSizeInSecs);
 @Override
 public void close() {_state.close();
 @Override
 public void nextTuple() {
 // Storm-spout  是从 kafka  消费数据, 把  kafka  的  consumer
 //  当成是一个 spout,并且向其他的 bolt 的发送数据
 //  拿到当前我管理的这些 PartitionsManager
 List PartitionManager  managers = _coordinator.getMyManagedPartitions();
 for (int i = 0; i   managers.size(); i++) {
 //  对于每一个分区的  PartitionManager
 // in case the number of managers decreased
 //  当前的分区
 _currPartitionIndex = _currPartitionIndex % managers.size();
 //  拿到当前的分区,并且发送,这里把 SpoutOutputCollector 传递进去了,由他发射元祖
 EmitState state = managers.get(_currPartitionIndex)
 .next(_collector);
 //  如果发送状态为:发送 - 还有剩余
 if (state != EmitState.EMITTED_MORE_LEFT) {_currPartitionIndex = (_currPartitionIndex + 1)
 % managers.size();
 //  如果发送的状态为:  发送 - 没有剩余
 if (state != EmitState.NO_EMITTED) {
 break;
 long now = System.currentTimeMillis();
 if ((now - _lastUpdateMs)   _spoutConfig.stateUpdateIntervalMs) {commit();
 @Override
 public void ack(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;
 PartitionManager m = _coordinator.getManager(id.partition);
 if (m != null) {m.ack(id.offset);
 @Override
 public void fail(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;
 PartitionManager m = _coordinator.getManager(id.partition);
 if (m != null) {m.fail(id.offset);
 @Override
 public void deactivate() {
 //  停止工作
 commit();
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {System.out.println(_spoutConfig.scheme.getOutputFields());
 declarer.declare(_spoutConfig.scheme.getOutputFields());
 private void commit() {_lastUpdateMs = System.currentTimeMillis();
 for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {manager.commit();
}

        在粗浅的代码阅读之后,在这里进行详细的分析:

      1  KafkaSpout 之中持有了一个  MessageAndRealOffset 的内部类

public static class MessageAndRealOffset
 public Message msg;
 
 public long offset;
 
 public MessageAndRealOffset(Message msg,long offset)
 {
 this.msg = msg;
 this.offset = offset;
 }
}

    2 在 Spout 之中我们还持有了一个 PartitionCoordinator 的分区协调器,默认的情况我们实例化的对象

是 ZKCoordinator

 

关于 Storm 如何和 Kafka 进行整合就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-17发表,共计6103字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)