Storm怎么改变并行度

69次阅读
没有评论

共计 6644 个字符,预计需要花费 17 分钟才能阅读完成。

这篇文章主要介绍“Storm 怎么改变并行度”,在日常操作中,相信很多人在 Storm 怎么改变并行度问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm 怎么改变并行度”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

package bolts;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
public class WordNormalizer implements IRichBolt{
 private OutputCollector collector;
 
 public void cleanup(){}
 
 /**
 * The bolt will receive the line from the
 * words file and process it to Normalize this line
 *
 * The normalize will be put the words in lower case
 * and split the line to get all words in this
 */
 
 public void execute(Tuple input) { String sentence = input.getString(0);
 String[]words= sentence.split(   
 for(String word:words){ word =word.trim();
 if(!word.isEmpty()){ word =word.toLowerCase();
 //Emit the word
 List a =new ArrayList();
 a.add(input);
 collector.emit(a,new Values(word));
 }
 }
 // Acknowledge the tuple
 collector.ack(input);
 }
 
 public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
 this.collector=collector;
 }
 
 /**
 * The bolt will only emit the field  word 
 */
 public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields( word));
 }
 
}

提示:在这个类中,每调用一次 execute()方法,会发送多个元组。例如,当 execute()方法收到“This is the Storm book”这个句子时,该方法会发送 5 个新元组。

第二个 bolt,WordCounter,负责统计每个单词个数。当 topology 结束时 (cleanup() 方法被调用时),显示每个单词的个数。

提示:第二个 bolt 中什么也不发送,本例中,将数据添加到一个 map 对象中,但是现实生活中,bolt 可以将数据存储到一个数据库中。

package bolts;
 
import java.util.HashMap;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
 
public class WordCounter implements IRichBolt{
 Integer id;
 String name;
 Map String,Integer counters;
 
 private OutputCollector collector;
 
 /**
 * At the end of the spout (when the cluster is shutdown
 * We will show the word counters
 */
 
 @Override
 
 public void cleanup(){ System.out.println( -- Word Counter [ +name+ - +id+]-- 
 for(Map.Entry String,Integer entry: counters.entrySet()){ System.out.println(entry.getKey()+ :  +entry.getValue());
 }
 }
 
 /**
 * On each word We will count
 */
 @Override
 
 public void execute(Tuple input) { String str =input.getString(0);
 /**
 * If the word dosn t exist in the map we will create
 * this, if not We will add 1
 */
 if(!counters.containsKey(str)){ counters.put(str,1);
 }else{ Integer c =counters.get(str) +1;
 counters.put(str,c);
 }
 //Set the tuple as Acknowledge
 collector.ack(input);
 }
 
 /**
 * On create
 */
 
 @Override
 
 public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
 this.counters=newHashMap String,Integer 
 this.collector=collector;
 this.name=context.getThisComponentId();
 this.id=context.getThisTaskId();
 }
 
 @Override
 
 public void declareOutputFields(OutputFieldsDeclarer declarer) {}
 
}

execute()方法使用一个映射 (Map 类型) 采集单词并统计这些单词个数。当 topology 结束的时候,cleanup()方法被调用并且打印出 counter 映射。(这仅仅是个例子,通常情况下,当 topology 关闭时,你应该使用 cleanup()方法关闭活动链接和其他资源。)

主类

在主类中,你将创建 topology 和一个 LocalCluster 对象,LocalCluster 对象使你可以在本地测试和调试 topology。LocalCluster 结合 Config 对象允许你尝试不同的集群配置。例如,如果不慎使用一个全局变量或者类变量,当配置不同数量的 worker 测试 topology 的时候,你将会发现这个错误。(关于 config 对象在第三章会有更多介绍)

提示:所有的 topology 结点应该可以在进程间没有数据共享的情形下独立运行(也就是说没有全局或者类变量),因为当 topology 运行在一个真实的集群上时,这些进程可能运行在不同的机器上。

你将使用 TopologyBuilder 创建 topology,TopologyBuilder 会告诉 Storm 怎么安排节点顺序、它们怎么交换数据。

TopologyBuilder builder =new TopologyBuilder();
builder.setSpout(word-reader ,new WordReader());
builder.setBolt(word-normalizer ,new WordNormalizer()).shuffleGrouping( word-reader 
builder.setBolt(word-counter ,new WordCounter(),2).fieldsGrouping(word-normalizer ,new Fields( word));

本例中 spout 和 bolt 之间使用随机分组 (shuffleGrouping) 连接,这种分组类型告诉 Storm 以随机分布的方式从源节点往目标节点发送消息。

接着,创建一个包含 topology 配置信息的 Config 对象,该配置信息在运行时会与集群配置信息合并,并且通过 prepare()方法发送到所有节点。

Config conf =new Config();
conf.put(wordsFile ,args[0]);
conf.setDebug(false);

将 wordFile 属性设置为将要被 spout 读取的文件名称(文件名在 args 参数中传入),并将 debug 属性设置为 true,因为你在开发过程中,当 debug 为 true 时,Storm 会打印节点间交换的所有消息和其他调试数据,这些信息有助于理解 topology 是如何运行的。

前面提到,你将使用 LocalCluster 来运行 topology。在一个产品环境中,topology 会持续运行,但是在本例中,你仅需运行 topology 几秒钟就能看到结果。

LocalCluster cluster =new LocalCluster();
cluster.submitTopology(Getting-Started-Toplogie ,conf,builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();

使用 createTopology 和 submitTopology 创建、运行 topology,睡眠两秒(topology 运行在不同的线程中),然后通过关闭集群来停止 topology。

例 2 - 3 将上面代码拼凑到一起。

例 2 -3.src/main/java/TopologyMain.java

import spouts.WordReader;
import bolts.WordCounter;
import bolts.WordNormalizer;
 
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
 
public class TopologyMain{ public static void main(String[]args)throws InterruptedException{
 //Topology definition
 TopologyBuilder builder =new TopologyBuilder();
 builder.setSpout(word-reader ,new WordReader());
 builder.setBolt(word-normalizer ,new WordNormalizer()).shuffleGrouping( word-reader 
 builder.setBolt(word-counter ,new WordCounter(),2).fieldsGrouping(word-normalizer ,new Fields( word));
 
 //Configuration
 Config conf =new Config();
 conf.put(wordsFile ,args[0]);
 conf.setDebug(false);
 
 //Topology run
 conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
 LocalCluster cluster =new LocalCluster();
 cluster.submitTopology(Getting-Started-Toplogie ,conf,builder.createTopology());
 Thread.sleep(1000);
 cluster.shutdown();
 }
 
}

运行本项目

现在开始准备运行第一个 topology!如果你新建一个文本文件(src/main/resources/words.txt)并且每行一个单词,则可以通过如下命令运行这个 topology:

mvn exec:java -Dexec.main >

例如,如果你使用如下 words.txt 文件:

Storm
test
are
great
is
an
Storm
simple
application
but
very
powerful
really
Storm
is
great

在日志中,你将会看到类似如下信息:

is: 2
application: 1
but: 1
great: 1
test: 1
simple: 1
Storm: 3
really: 1
are: 1
great: 1
an: 1
powerful: 1
very: 1

在本例中,你只使用了每个结点的一个单一实例,假如此时有一个非常大的日志文件怎么去统计每个单词的个数?此时可以很方便地改系统中节点数量来并行工作,如创建 WordCounter 的两个实例:

1builder.setBolt(   
    word-counter    
    ,    
    new    
    WordCounter(),    
    2    
    ).shuffleGrouping(   
    word-normalizer    
    );

重新运行这个程序,你将看到:

– Word Counter [word-counter-2] –
application: 1
is: 1
great: 1
are: 1
powerful: 1
Storm: 3
– Word Counter [word-counter-3] –
really: 1
is: 1
but: 1
great: 1
test: 1
simple: 1
an: 1
very: 1

太棒了!改变并行度,so easy(当然,在实际生活中,每个实例运行在不同的机器中)。但仔细一看似乎还有点问题:“is”和“great”这两个单词在每个 WordCounter 实例中都被计算了一次。Why?当使用随机分组 (shuffleGrouping) 时,Storm 以随机分布的方式向每个 bolt 实例发送每条消息。在这个例子中,将相同的单词发送到同一个 WordCounter 实例是更理想的。为了实现这个,你可以将 shuffleGrounping(“word-normalizer”)改成 fieldsGrouping(“word-normalizer”,new Fields(“word”))。尝试一下并重新运行本程序来确认结果。

到此,关于“Storm 怎么改变并行度”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计6644字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)