怎么使用Storm

63次阅读
没有评论

共计 4406 个字符,预计需要花费 12 分钟才能阅读完成。

这篇文章主要介绍“怎么使用 Storm”,在日常操作中,相信很多人在怎么使用 Storm 问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么使用 Storm”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

项目 Pom(Storm jar 没有提交到 Maven 中央仓库,需要在项目中加入下面的仓库地址):

repositories 
  repository 
  id central /id 
  name Maven Repository Switchboard /name 
  layout default /layout 
  url http://maven.oschina.net/content/groups/public/ /url 
  snapshots 
  enabled false /enabled 
  /snapshots 
  /repository 
  repository 
  id clojars /id 
  url https://clojars.org/repo/ /url 
  snapshots 
  enabled false /enabled 
  /snapshots 
  releases 
  enabled true /enabled 
  /releases 
  /repository 
 /repositories 
 dependencies 
  dependency 
  groupId org.yaml /groupId 
  artifactId snakeyaml /artifactId 
  version 1.13 /version 
  /dependency 
  dependency 
  groupId org.apache.zookeeper /groupId 
  artifactId zookeeper /artifactId 
  version 3.3.3 /version 
  /dependency 
  dependency 
  groupId org.clojure /groupId 
  artifactId clojure /artifactId 
  version 1.5.1 /version 
  /dependency 
  dependency 
  groupId storm /groupId 
  artifactId storm /artifactId 
  version 0.9.0.1 /version 
  /dependency 
  dependency 
  groupId storm /groupId 
  artifactId libthrift7 /artifactId 
  version 0.7.0 /version 
  /dependency 
 /dependencies

下面是一个 Storm 的 HelloWord 的例子,代码有删减,熟悉 Storm 的读者自然能把代码组织成一个完整的例子。

public static void main(String[] args) {Config conf = new Config();
 conf.put(Config.STORM_LOCAL_DIR,  /Volumes/Study/data/storm 
 conf.put(Config.STORM_CLUSTER_MODE,  local 
 //conf.put( storm.local.mode.zmq ,  false 
 conf.put( storm.zookeeper.root ,  /storm 
 conf.put(storm.zookeeper.session.timeout , 50000);
 conf.put( storm.zookeeper.servers ,  nowledgedata-n15 
 conf.put(storm.zookeeper.port , 2181);
 //conf.setDebug(true);
 //conf.setNumWorkers(2);
 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(words , new TestWordSpout(), 2); 
 builder.setBolt(exclaim2 , new DefaultStringBolt(), 5)
  .shuffleGrouping( words 
 LocalCluster cluster = new LocalCluster();
 cluster.submitTopology(test , conf, builder.createTopology());
}

Config.STORM_LOCAL_DIR 是配置一个本地路径,Storm 会在这个路径写入一些配置信息和临时数据。

Config.STORM_CLUSTER_MODE 是运行模式,local 和 distributed 两个选项,即本地模式和分布式模式。本地模式在运行时时多线程模拟的,开发测试用;分布式模式在分布式集群下是多进程的,真正的分布式。

Storm 的 Spout 和 Blot 高可用是通过 ZooKeeper 协调的,storm.zookeeper.root 是一个 ZooKeeper 地址,并且有对应的端口号

Debug 是测试模式,有更详细的日志信息。

TestWordSpout 是一个 Storm 自带的例子,用来随机的产生 code new String[] { nathan , mike , jackson , golda , bertels /code 列表中的字符串,用来提供数据源。

其中 DefaultStringBolt 的源码:

OutputCollector collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
 this.collector = collector;
public void execute(Tuple tuple) {log.info( rev a message:   + tuple.getString(0));
 collector.emit(tuple, new Values(tuple.getString(0) +  !!! ));
 collector.ack(tuple);
}

运行日志:

10658 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: jackson 10658 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: jackson 10758 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: mike 10758 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: nathan 10859 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: nathan 10859 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: bertels 10961 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: jackson 10961 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: jackson 11061 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: nathan 11062 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: nathan 11162 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: bertels 11163 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: jackson

数据由一个 Storm 叫做喷嘴(Spout,也相当一个水龙头,能产生数据的来源端)产生,然后传递给后端一连串的的 Blot,最终被转换和消费。而 Spout 和 Blot 都是并行的,并行度都可以自己设置(本地运行是靠多线程模拟的)。如:

builder.setSpout(words , new TestWordSpout(), 2); 
builder.setBolt(exclaim2 , new DefaultStringBolt(), 5)

喷嘴 TestWordSpout 的并行度是 2,DefaultStringBolt 的并行度是 5.

从日志可以看出,数据经过喷嘴到达预先定于的一个 Blot,打印了日志。我测试代码设置的并行度是 5,日志中统计,确实是 5 个线程:

Thread-29-exclaim2

Thread-31-exclaim2

Thread-26-exclaim2

Thread-33-exclaim2

Thread-35-exclaim2

借用 OSC 网友的话说,Hadoop 就是商场里自动升降式的电梯,用户需要排队等待,选按楼层,然后到达;而 Storm 就像是自动扶梯,扶梯预先设置好运行后,来人就立即运走,目的地是明确的。

Storm 按我的理解,Storm 和 Hadoop 是完全不同的,设计上也没有半点拟合的部分。Storm 更像是我之前介绍过的 Spring Integration,是一个数据流系统。它能把数据按照预设定的流程,把数据做各种转换,传递,分解,合并,最后数据到达后端存储。只不过 Storm 是可以分布式,而且分布式的能力也是可以自己设置。

Storm 的这种特性很适合大数据类的 ETL 系统开发。

到此,关于“怎么使用 Storm”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计4406字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)