共计 7777 个字符,预计需要花费 20 分钟才能阅读完成。
本篇内容主要讲解“Storm 怎么实现单词计数”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“Storm 怎么实现单词计数”吧!
在上一次单词计数的基础上做如下改动:使用 自定义 分组策略,将首字母相同的单词发送给同一个 task 计数
自定义 CustomStreamGrouping
package com.zhch.v4;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.WorkerTopologyContext;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ModuleGrouping implements CustomStreamGrouping, Serializable {
private List Integer tasks;
@Override
public void prepare(WorkerTopologyContext workerContext, GlobalStreamId streamId, List Integer targetTasks) {
this.tasks = targetTasks;
}
@Override
public List Integer chooseTasks(int taskId, List Object values) {
List Integer taskIds = new ArrayList Integer
if (values.size() 0) { String str = values.get(0).toString();
if (str.isEmpty()) { taskIds.add(0);
} else { Integer index = str.charAt(0) % tasks.size();
taskIds.add(tasks.get(index));
}
}
return taskIds;
}
}
数据源 spout
package com.zhch.v4;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class SentenceSpout extends BaseRichSpout {
private FileReader fileReader = null;
private boolean completed = false;
private ConcurrentHashMap UUID, Values pending;
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( sentence));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
this.pending = new ConcurrentHashMap UUID, Values
try { this.fileReader = new FileReader(map.get( wordsFile).toString());
} catch (Exception e) { throw new RuntimeException( Error reading file [ + map.get( wordsFile) + ]
}
}
@Override
public void nextTuple() { if (completed) {
try { Thread.sleep(1000);
} catch (InterruptedException e) { }
}
String line;
BufferedReader reader = new BufferedReader(fileReader);
try { while ((line = reader.readLine()) != null) { Values values = new Values(line);
UUID msgId = UUID.randomUUID();
this.pending.put(msgId, values);
this.collector.emit(values, msgId);
}
} catch (Exception e) { throw new RuntimeException( Error reading tuple , e);
} finally {
completed = true;
}
}
@Override
public void ack(Object msgId) { this.pending.remove(msgId);
}
@Override
public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId);
}
}
实现语句分割 bolt
package com.zhch.v4;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField( sentence
String[] words = sentence.split(
for (String word : words) { collector.emit(tuple, new Values(word));
}
this.collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word));
}
}
实现单词计数 bolt
package com.zhch.v4;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap String, Long counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counts = new HashMap String, Long
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField( word
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
this.counts.put(word, count);
BufferedWriter writer = null;
try { writer = new BufferedWriter(new FileWriter( /home/grid/stormData/result.txt));
List String keys = new ArrayList String
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) { Long c = this.counts.get(key);
writer.write(key + : + c);
writer.newLine();
writer.flush();
}
} catch (Exception e) { e.printStackTrace();
} finally { if (writer != null) {
try { writer.close();
} catch (Exception e) { e.printStackTrace();
}
writer = null;
}
}
this.collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word , count));
}
}
实现单词计数 topology
package com.zhch.v4;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class WordCountTopology {
public static final String SENTENCE_SPOUT_ID = sentence-spout
public static final String SPLIT_BOLT_ID = split-bolt
public static final String COUNT_BOLT_ID = count-bolt
public static final String TOPOLOGY_NAME = word-count-topology-v4
public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)
.shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt, 2)
.customGrouping(SPLIT_BOLT_ID, new ModuleGrouping()); // 使用 自定义 分组策略
Config config = new Config();
config.put(wordsFile , args[0]);
if (args != null args.length 1) { config.setNumWorkers(2);
// 集群模式启动
StormSubmitter.submitTopology(args[1], config, builder.createTopology());
} else { LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
try { Thread.sleep(5 * 1000);
} catch (InterruptedException e) { }
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
}
提交到 Storm 集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v4
运行结果:
[grid@hadoop5 stormData]$ cat result.txt
Apache : 1
ETL : 1
It : 1
Storm : 4
a : 4
analytics : 1
and : 5
any : 1
at : 1
can : 1
cases: : 1
clocked : 1
computation : 2
continuous : 1
easy : 2
guarantees : 1
is : 6
it : 2
machine : 1
makes : 1
many : 1
million : 1
more : 1
of : 2
online : 1
open : 1
operate : 1
over : 1
scalable : 1
second : 1
set : 1
simple : 1
source : 1
streams : 1
system : 1
unbounded : 1
up : 1
use : 2
used : 1
what : 1
will : 1
with : 1
your : 1
[grid@hadoop6 stormData]$ cat result.txt
Hadoop : 1
RPC : 1
batch : 1
be : 2
benchmark : 1
data : 2
did : 1
distributed : 2
doing : 1
fast: : 1
fault-tolerant : 1
for : 2
free : 1
fun : 1
has : 1
language : 1
learning : 1
lot : 1
node : 1
per : 2
process : 1
processed : 2
processing : 2
programming : 1
realtime : 3
reliably : 1
to : 3
torm : 1
tuples : 1
到此,相信大家对“Storm 怎么实现单词计数”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
正文完