Storm如何实现单词计数

84次阅读
没有评论

共计 6895 个字符,预计需要花费 18 分钟才能阅读完成。

这篇文章主要介绍“Storm 如何实现单词计数”,在日常操作中,相信很多人在 Storm 如何实现单词计数问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm 如何实现单词计数”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

1. 使用 mvn 命令创建项目

mvn archetype:generate -DgroupId=storm.test -DartifactId=Storm01 -DpackageName=com.zhch.v1

然后编辑配置文件 pom.xml,添加 storm 依赖  

dependency 
  groupId org.apache.storm /groupId 
  artifactId storm-core /artifactId 
  version 0.9.4 /version 
 /dependency

最后通过下述命令来编译项目,编译正确完成后导入到 IDE 中  

mvn install

当然,也可以在 IDE 中安装 maven 插件,从而直接在 IDE 中创建 maven 项目  

2. 实现数据源,用重复的静态语句来模拟数据源

package storm.test.v1;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SentenceSpout extends BaseRichSpout { private String[] sentences = {
  storm integrates with the queueing ,
  and database technologies you already use ,
  a storm topology consumes streams of data ,
  and processes those streams in arbitrarily complex ways ,
  repartitioning the streams between each stage of the computation however needed 
 };
 private int index = 0;
 private SpoutOutputCollector collector;
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( sentence));
 }
 @Override
 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
 this.collector = spoutOutputCollector;
 }
 @Override
 public void nextTuple() { this.collector.emit(new Values(sentences[index]));
 index++;
 if (index  = sentences.length) {
 index = 0;
 }
 try { Thread.sleep(1);
 } catch (InterruptedException e) { }
 }
}

3. 实现语句分割 bolt 

package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
 private OutputCollector collector;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 }
 @Override
 public void execute(Tuple tuple) {
 String sentence = tuple.getStringByField( sentence 
 String[] words = sentence.split(   
 for (String word : words) { this.collector.emit(new Values(word));
 }
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word));
 }
}

4. 实现单词计数 bolt 

package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
 private OutputCollector collector;
 private HashMap String, Long  counts = null;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 this.counts = new HashMap String, Long 
 }
 @Override
 public void execute(Tuple tuple) {
 String word = tuple.getStringByField( word 
 Long count = this.counts.get(word);
 if (count == null) {
 count = 0L;
 }
 count++;
 this.counts.put(word, count);
 this.collector.emit(new Values(word, count));
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word ,  count));
 }
}

5. 实现上报 bolt 

package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReportBolt extends BaseRichBolt {
 private HashMap String, Long  counts = null;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 counts = new HashMap String, Long 
 }
 @Override
 public void execute(Tuple tuple) {
 String word = tuple.getStringByField( word 
 Long count = tuple.getLongByField( count 
 this.counts.put(word, count);
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
 @Override
 public void cleanup() { // 本地模式下,终止 topology 时可以保证 cleanup() 被执行
 System.out.println( --- FINAL COUNTS --- 
 List String  keys = new ArrayList String 
 keys.addAll(this.counts.keySet());
 Collections.sort(keys);
 for (String key : keys) { System.out.println(key +   :   + this.counts.get(key));
 }
 System.out.println( ---------- 
 }
}

6. 实现单词计数 topology 

package storm.test.v1;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class WordCountTopology {
 private static final String SENTENCE_SPOUT_ID =  sentence-spout 
 private static final String SPLIT_BOLT_ID =  split-bolt 
 private static final String COUNT_BOLT_ID =  count-bolt 
 private static final String REPORT_BOLT_ID =  report-bolt 
 private static final String TOPOLOGY_NAME =  word-count-topology 
 public static void main(String[] args) { SentenceSpout spout = new SentenceSpout();
 SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
 WordCountBolt countBolt = new WordCountBolt();
 ReportBolt reportBolt = new ReportBolt();
 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(SENTENCE_SPOUT_ID, spout); // 注册数据源
 builder.setBolt(SPLIT_BOLT_ID, spiltBolt) // 注册 bolt
 .shuffleGrouping(SENTENCE_SPOUT_ID); // 该 bolt 订阅 spout 随机均匀发射来的数据流
 builder.setBolt(COUNT_BOLT_ID, countBolt)
 .fieldsGrouping(SPLIT_BOLT_ID, new Fields( word)); // 该 bolt 订阅 spiltBolt 发射来的数据流,并且保证 word 字段值相同的 tuple 会被路由到同一个 countBolt
 builder.setBolt(REPORT_BOLT_ID, reportBolt)
 .globalGrouping(COUNT_BOLT_ID); // 该 bolt 订阅 countBolt 发射来的数据流,并且所有的 tuple 都会被路由到唯一的一个 reportBolt 中
 Config config = new Config();
 // 本地模式启动
 LocalCluster cluster = new LocalCluster();
 cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
 try { Thread.sleep(5 * 1000);
 } catch (InterruptedException e) { }
 cluster.killTopology(TOPOLOGY_NAME);
 cluster.shutdown();
 }
}

7. 运行结果:

--- FINAL COUNTS ---
a : 302
already : 302
and : 604
arbitrarily : 302
between : 302
complex : 302
computation : 302
consumes : 302
data : 302
database : 302
each : 302
however : 302
in : 302
integrates : 302
needed : 302
of : 604
processes : 302
queueing : 302
repartitioning : 302
stage : 302
storm : 604
streams : 906
technologies : 302
the : 906
those : 302
topology : 302
use : 302
ways : 302
with : 302
you : 302
----------

到此,关于“Storm 如何实现单词计数”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计6895字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)