共计 7440 个字符,预计需要花费 19 分钟才能阅读完成。
这篇文章主要介绍“Storm 中怎么使用 Direct Grouping 分组策略”,在日常操作中,相信很多人在 Storm 中怎么使用 Direct Grouping 分组策略问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm 中怎么使用 Direct Grouping 分组策略”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
使用 Direct Grouping 分组策略,将首字母相同的单词发送给同一个 task 计数
数据源 spout
package com.zhch.v3;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class SentenceSpout extends BaseRichSpout {
private FileReader fileReader = null;
private boolean completed = false;
private ConcurrentHashMap UUID, Values pending;
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( sentence));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
this.pending = new ConcurrentHashMap UUID, Values
try { this.fileReader = new FileReader(map.get( wordsFile).toString());
} catch (Exception e) { throw new RuntimeException( Error reading file [ + map.get( wordsFile) + ]
}
}
@Override
public void nextTuple() { if (completed) {
try { Thread.sleep(1000);
} catch (InterruptedException e) { }
}
String line;
BufferedReader reader = new BufferedReader(fileReader);
try { while ((line = reader.readLine()) != null) { Values values = new Values(line);
UUID msgId = UUID.randomUUID();
this.pending.put(msgId, values);
this.collector.emit(values, msgId);
}
} catch (Exception e) { throw new RuntimeException( Error reading tuple , e);
} finally {
completed = true;
}
}
@Override
public void ack(Object msgId) { this.pending.remove(msgId);
}
@Override
public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId);
}
}
实现语句分割 bolt
package com.zhch.v3;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
private List Integer numCounterTasks;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
// 获取下游 bolt 的 taskId 列表
this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID);
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField( sentence
String[] words = sentence.split(
for (String word : words) { Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word));
collector.emitDirect(taskId, tuple, new Values(word));
}
this.collector.ack(tuple);
}
public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase();
if (word.isEmpty())
return 0;
else {
// 单词首字母对下游 bolt taskId 列表长度取余
return word.charAt(0) % numCounterTasks.size();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word));
}
}
实现单词计数 bolt
package com.zhch.v3;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap String, Long counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counts = new HashMap String, Long
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField( word
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
this.counts.put(word, count);
BufferedWriter writer = null;
try { writer = new BufferedWriter(new FileWriter( /home/grid/stormData/result.txt));
Iterator String keys = this.counts.keySet().iterator();
while (keys.hasNext()) { String w = keys.next();
Long c = this.counts.get(w);
writer.write(w + : + c);
writer.newLine();
writer.flush();
}
} catch (Exception e) { e.printStackTrace();
} finally { if (writer != null) {
try { writer.close();
} catch (Exception e) { e.printStackTrace();
}
writer = null;
}
}
this.collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word , count));
}
}
实现单词计数 topology
package com.zhch.v3;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class WordCountTopology {
public static final String SENTENCE_SPOUT_ID = sentence-spout
public static final String SPLIT_BOLT_ID = split-bolt
public static final String COUNT_BOLT_ID = count-bolt
public static final String TOPOLOGY_NAME = word-count-topology-v3
public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)
.shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt, 2)
.directGrouping(SPLIT_BOLT_ID); // 使用 Direct Grouping 分组策略
Config config = new Config();
config.put(wordsFile , args[0]);
if (args != null args.length 1) { config.setNumWorkers(2);
// 集群模式启动
StormSubmitter.submitTopology(args[1], config, builder.createTopology());
} else { LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
try { Thread.sleep(5 * 1000);
} catch (InterruptedException e) { }
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
}
提交到 Storm 集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3
运行结果:
[grid@hadoop5 stormData]$ cat result.txt
second : 1
can : 1
set : 1
simple : 1
use : 2
unbounded : 1
used : 1
It : 1
Storm : 4
online : 1
cases: : 1
open : 1
Apache : 1
of : 2
over : 1
more : 1
clocked : 1
easy : 2
scalable : 1
any : 1
guarantees : 1
ETL : 1
million : 1
continuous : 1
is : 6
with : 1
it : 2
makes : 1
your : 1
a : 4
at : 1
machine : 1
analytics : 1
up : 1
and : 5
many : 1
system : 1
source : 1
what : 1
operate : 1
will : 1
computation : 2
streams : 1
[grid@hadoop6 stormData]$ cat result.txt
to : 3
for : 2
data : 2
distributed : 2
has : 1
free : 1
programming : 1
reliably : 1
fast: : 1
processing : 2
be : 2
Hadoop : 1
did : 1
fun : 1
learning : 1
torm : 1
process : 1
RPC : 1
node : 1
processed : 2
per : 2
realtime : 3
benchmark : 1
batch : 1
doing : 1
lot : 1
language : 1
tuples : 1
fault-tolerant : 1
到此,关于“Storm 中怎么使用 Direct Grouping 分组策略”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!
正文完