共计 6103 个字符,预计需要花费 16 分钟才能阅读完成。
这篇文章将为大家详细讲解有关 Storm 如何和 Kafka 进行整合,文章内容质量较高,因此丸趣 TV 小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
对于 Storm 如何和 Kafka 进行整合
package com.mixbox.storm.kafka;
import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;
import java.util.*;
* @author Yin Shuai
*/
public class KafkaSpout extends BaseRichSpout {public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
* 内部类,Message 和 Offset 的偏移量对象
*
* @author Yin Shuai
*/
public static class MessageAndRealOffset {
public Message msg;
public long offset;
public MessageAndRealOffset(Message msg, long offset) {
this.msg = msg;
this.offset = offset;
* 发射的枚举类
* @author Yin Shuai
*/
static enum EmitState {
EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED
String _uuid = UUID.randomUUID().toString();
SpoutConfig _spoutConfig;
SpoutOutputCollector _collector;
// 分区的协调器,getMyManagedPartitions 拿到我所管理的分区
PartitionCoordinator _coordinator;
// 动态的分区链接:保存到 kafka 各个节点的连接,以及负责的 topic 的 partition 号码
DynamicPartitionConnections _connections;
// 提供了从 zookeeper 读写 kafka 消费者信息的功能
ZkState _state;
// 上次更新的毫秒数
long _lastUpdateMs = 0;
// 当前的分区
int _currPartitionIndex = 0;
public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutConf;
@SuppressWarnings(unchecked)
@Override
public void open(Map conf, final TopologyContext context,
final SpoutOutputCollector collector) {
_collector = collector;
List String zkServers = _spoutConfig.zkServers;
// 初始化的时候如果 zkServers 为空,那么初始化 默认的配置 Zookeeper
if (zkServers == null) {zkServers = new ArrayList String () {
add( 192.168.50.144
add( 192.168.50.169
add( 192.168.50.207
// zkServers =
// (List String)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
System.out.println( 使用的是 Storm 默认配置的 Zookeeper List : + zkServers);
Integer zkPort = _spoutConfig.zkPort;
// 在这里我们也同时 来检查 zookeeper 的端口是否为空
if (zkPort == null) {
zkPort = 2181;
// zkPort = ((Number)
// conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
Map stateConf = new HashMap(conf);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
// 通过保存的配置文件,我们持有了一个 zookeeper 的 state,支持节点内容的创建和删除
_state = new ZkState(stateConf);
// 对于连接的维护
_connections = new DynamicPartitionConnections(_spoutConfig,
KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
// 拿到总共的任务次数
int totalTasks = context
.getComponentTasks(context.getThisComponentId()).size();
// 判断当前的主机是否是静态的 statichost
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, _uuid);
// 当你拿到的 spoutConfig 是 zkhost 的时候
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig,
_state, context.getThisTaskIndex(), totalTasks, _uuid);
context.registerMetric(kafkaOffset , new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
@Override
public Object getValueAndReset() {
List PartitionManager pms = _coordinator
.getMyManagedPartitions();
Set Partition latestPartitions = new HashSet();
for (PartitionManager pm : pms) {latestPartitions.add(pm.getPartition());
_kafkaOffsetMetric.refreshPartitions(latestPartitions);
for (PartitionManager pm : pms) {
_kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
return _kafkaOffsetMetric.getValueAndReset();}, _spoutConfig.metricsTimeBucketSizeInSecs);
context.registerMetric(kafkaPartition , new IMetric() {
@Override
public Object getValueAndReset() {
List PartitionManager pms = _coordinator
.getMyManagedPartitions();
Map concatMetricsDataMaps = new HashMap();
for (PartitionManager pm : pms) {concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
return concatMetricsDataMaps;
}, _spoutConfig.metricsTimeBucketSizeInSecs);
@Override
public void close() {_state.close();
@Override
public void nextTuple() {
// Storm-spout 是从 kafka 消费数据, 把 kafka 的 consumer
// 当成是一个 spout,并且向其他的 bolt 的发送数据
// 拿到当前我管理的这些 PartitionsManager
List PartitionManager managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i managers.size(); i++) {
// 对于每一个分区的 PartitionManager
// in case the number of managers decreased
// 当前的分区
_currPartitionIndex = _currPartitionIndex % managers.size();
// 拿到当前的分区,并且发送,这里把 SpoutOutputCollector 传递进去了,由他发射元祖
EmitState state = managers.get(_currPartitionIndex)
.next(_collector);
// 如果发送状态为:发送 - 还有剩余
if (state != EmitState.EMITTED_MORE_LEFT) {_currPartitionIndex = (_currPartitionIndex + 1)
% managers.size();
// 如果发送的状态为: 发送 - 没有剩余
if (state != EmitState.NO_EMITTED) {
break;
long now = System.currentTimeMillis();
if ((now - _lastUpdateMs) _spoutConfig.stateUpdateIntervalMs) {commit();
@Override
public void ack(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {m.ack(id.offset);
@Override
public void fail(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {m.fail(id.offset);
@Override
public void deactivate() {
// 停止工作
commit();
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {System.out.println(_spoutConfig.scheme.getOutputFields());
declarer.declare(_spoutConfig.scheme.getOutputFields());
private void commit() {_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {manager.commit();
}
在粗浅的代码阅读之后,在这里进行详细的分析:
1 KafkaSpout 之中持有了一个 MessageAndRealOffset 的内部类
public static class MessageAndRealOffset
public Message msg;
public long offset;
public MessageAndRealOffset(Message msg,long offset)
{
this.msg = msg;
this.offset = offset;
}
}
2 在 Spout 之中我们还持有了一个 PartitionCoordinator 的分区协调器,默认的情况我们实例化的对象
是 ZKCoordinator
关于 Storm 如何和 Kafka 进行整合就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
正文完