Storm中怎么使用Direct Grouping分组策略

72次阅读
没有评论

共计 7440 个字符,预计需要花费 19 分钟才能阅读完成。

这篇文章主要介绍“Storm 中怎么使用 Direct Grouping 分组策略”,在日常操作中,相信很多人在 Storm 中怎么使用 Direct Grouping 分组策略问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm 中怎么使用 Direct Grouping 分组策略”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

使用  Direct Grouping 分组策略,将首字母相同的单词发送给同一个 task 计数
数据源 spout
 

package com.zhch.v3;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class SentenceSpout extends BaseRichSpout {
 private FileReader fileReader = null;
 private boolean completed = false;
 private ConcurrentHashMap UUID, Values  pending;
 private SpoutOutputCollector collector;
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( sentence));
 }
 @Override
 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
 this.collector = spoutOutputCollector;
 this.pending = new ConcurrentHashMap UUID, Values 
 try { this.fileReader = new FileReader(map.get( wordsFile).toString());
 } catch (Exception e) { throw new RuntimeException( Error reading file [  + map.get( wordsFile) +  ] 
 }
 }
 @Override
 public void nextTuple() { if (completed) {
 try { Thread.sleep(1000);
 } catch (InterruptedException e) { }
 }
 String line;
 BufferedReader reader = new BufferedReader(fileReader);
 try { while ((line = reader.readLine()) != null) { Values values = new Values(line);
 UUID msgId = UUID.randomUUID();
 this.pending.put(msgId, values);
 this.collector.emit(values, msgId);
 }
 } catch (Exception e) { throw new RuntimeException( Error reading tuple , e);
 } finally {
 completed = true;
 }
 }
 @Override
 public void ack(Object msgId) { this.pending.remove(msgId);
 }
 @Override
 public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId);
 }
}

实现语句分割 bolt

package com.zhch.v3;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
 private OutputCollector collector;
 private List Integer  numCounterTasks;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 // 获取下游 bolt 的 taskId 列表
 this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID);
 }
 @Override
 public void execute(Tuple tuple) {
 String sentence = tuple.getStringByField( sentence 
 String[] words = sentence.split(   
 for (String word : words) { Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word));
 collector.emitDirect(taskId, tuple, new Values(word));
 }
 this.collector.ack(tuple);
 }
 public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase();
 if (word.isEmpty())
 return 0;
 else {
 // 单词首字母对下游  bolt taskId  列表长度取余
 return word.charAt(0) % numCounterTasks.size();
 }
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word));
 }
}

实现单词计数 bolt 

package com.zhch.v3;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
 private OutputCollector collector;
 private HashMap String, Long  counts = null;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 this.counts = new HashMap String, Long 
 }
 @Override
 public void execute(Tuple tuple) {
 String word = tuple.getStringByField( word 
 Long count = this.counts.get(word);
 if (count == null) {
 count = 0L;
 }
 count++;
 this.counts.put(word, count);
 BufferedWriter writer = null;
 try { writer = new BufferedWriter(new FileWriter( /home/grid/stormData/result.txt));
 Iterator String  keys = this.counts.keySet().iterator();
 while (keys.hasNext()) { String w = keys.next();
 Long c = this.counts.get(w);
 writer.write(w +   :   + c);
 writer.newLine();
 writer.flush();
 }
 } catch (Exception e) { e.printStackTrace();
 } finally { if (writer != null) {
 try { writer.close();
 } catch (Exception e) { e.printStackTrace();
 }
 writer = null;
 }
 }
 this.collector.ack(tuple);
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word ,  count));
 }
}

实现单词计数 topology 

package com.zhch.v3;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class WordCountTopology {
 public static final String SENTENCE_SPOUT_ID =  sentence-spout 
 public static final String SPLIT_BOLT_ID =  split-bolt 
 public static final String COUNT_BOLT_ID =  count-bolt 
 public static final String TOPOLOGY_NAME =  word-count-topology-v3 
 public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout();
 SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
 WordCountBolt countBolt = new WordCountBolt();
 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
 builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)
 .shuffleGrouping(SENTENCE_SPOUT_ID);
 builder.setBolt(COUNT_BOLT_ID, countBolt, 2)
 .directGrouping(SPLIT_BOLT_ID); // 使用  Direct Grouping  分组策略
 Config config = new Config();
 config.put(wordsFile , args[0]);
 if (args != null   args.length   1) { config.setNumWorkers(2);
 // 集群模式启动
 StormSubmitter.submitTopology(args[1], config, builder.createTopology());
 } else { LocalCluster cluster = new LocalCluster();
 cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
 try { Thread.sleep(5 * 1000);
 } catch (InterruptedException e) { }
 cluster.killTopology(TOPOLOGY_NAME);
 cluster.shutdown();
 }
 }
}

提交到 Storm 集群  

storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3

运行结果:

[grid@hadoop5 stormData]$ cat result.txt 
second : 1
can : 1
set : 1
simple : 1
use : 2
unbounded : 1
used : 1
It : 1
Storm : 4
online : 1
cases: : 1
open : 1
Apache : 1
of : 2
over : 1
more : 1
clocked : 1
easy : 2
scalable : 1
any : 1
guarantees : 1
ETL : 1
million : 1
continuous : 1
is : 6
with : 1
it : 2
makes : 1
your : 1
a : 4
at : 1
machine : 1
analytics : 1
up : 1
and : 5
many : 1
system : 1
source : 1
what : 1
operate : 1
will : 1
computation : 2
streams : 1
[grid@hadoop6 stormData]$ cat result.txt 
to : 3
for : 2
data : 2
distributed : 2
has : 1
free : 1
programming : 1
reliably : 1
fast: : 1
processing : 2
be : 2
Hadoop : 1
did : 1
fun : 1
learning : 1
torm : 1
process : 1
RPC : 1
node : 1
processed : 2
per : 2
realtime : 3
benchmark : 1
batch : 1
doing : 1
lot : 1
language : 1
tuples : 1
fault-tolerant : 1

到此,关于“Storm 中怎么使用 Direct Grouping 分组策略”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计7440字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)