共计 6895 个字符,预计需要花费 18 分钟才能阅读完成。
这篇文章主要介绍“Storm 如何实现单词计数”,在日常操作中,相信很多人在 Storm 如何实现单词计数问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm 如何实现单词计数”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
1. 使用 mvn 命令创建项目
mvn archetype:generate -DgroupId=storm.test -DartifactId=Storm01 -DpackageName=com.zhch.v1
然后编辑配置文件 pom.xml,添加 storm 依赖
dependency
groupId org.apache.storm /groupId
artifactId storm-core /artifactId
version 0.9.4 /version
/dependency
最后通过下述命令来编译项目,编译正确完成后导入到 IDE 中
mvn install
当然,也可以在 IDE 中安装 maven 插件,从而直接在 IDE 中创建 maven 项目
2. 实现数据源,用重复的静态语句来模拟数据源
package storm.test.v1;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SentenceSpout extends BaseRichSpout { private String[] sentences = {
storm integrates with the queueing ,
and database technologies you already use ,
a storm topology consumes streams of data ,
and processes those streams in arbitrarily complex ways ,
repartitioning the streams between each stage of the computation however needed
};
private int index = 0;
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( sentence));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
@Override
public void nextTuple() { this.collector.emit(new Values(sentences[index]));
index++;
if (index = sentences.length) {
index = 0;
}
try { Thread.sleep(1);
} catch (InterruptedException e) { }
}
}
3. 实现语句分割 bolt
package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField( sentence
String[] words = sentence.split(
for (String word : words) { this.collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word));
}
}
4. 实现单词计数 bolt
package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap String, Long counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counts = new HashMap String, Long
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField( word
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word , count));
}
}
5. 实现上报 bolt
package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReportBolt extends BaseRichBolt {
private HashMap String, Long counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
counts = new HashMap String, Long
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField( word
Long count = tuple.getLongByField( count
this.counts.put(word, count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
@Override
public void cleanup() { // 本地模式下,终止 topology 时可以保证 cleanup() 被执行
System.out.println( --- FINAL COUNTS ---
List String keys = new ArrayList String
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) { System.out.println(key + : + this.counts.get(key));
}
System.out.println( ----------
}
}
6. 实现单词计数 topology
package storm.test.v1;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = sentence-spout
private static final String SPLIT_BOLT_ID = split-bolt
private static final String COUNT_BOLT_ID = count-bolt
private static final String REPORT_BOLT_ID = report-bolt
private static final String TOPOLOGY_NAME = word-count-topology
public static void main(String[] args) { SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout); // 注册数据源
builder.setBolt(SPLIT_BOLT_ID, spiltBolt) // 注册 bolt
.shuffleGrouping(SENTENCE_SPOUT_ID); // 该 bolt 订阅 spout 随机均匀发射来的数据流
builder.setBolt(COUNT_BOLT_ID, countBolt)
.fieldsGrouping(SPLIT_BOLT_ID, new Fields( word)); // 该 bolt 订阅 spiltBolt 发射来的数据流,并且保证 word 字段值相同的 tuple 会被路由到同一个 countBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt)
.globalGrouping(COUNT_BOLT_ID); // 该 bolt 订阅 countBolt 发射来的数据流,并且所有的 tuple 都会被路由到唯一的一个 reportBolt 中
Config config = new Config();
// 本地模式启动
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
try { Thread.sleep(5 * 1000);
} catch (InterruptedException e) { }
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
7. 运行结果:
--- FINAL COUNTS ---
a : 302
already : 302
and : 604
arbitrarily : 302
between : 302
complex : 302
computation : 302
consumes : 302
data : 302
database : 302
each : 302
however : 302
in : 302
integrates : 302
needed : 302
of : 604
processes : 302
queueing : 302
repartitioning : 302
stage : 302
storm : 604
streams : 906
technologies : 302
the : 906
those : 302
topology : 302
use : 302
ways : 302
with : 302
you : 302
----------
到此,关于“Storm 如何实现单词计数”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!
正文完