Storm的优点有哪些

83次阅读
没有评论

共计 7603 个字符,预计需要花费 20 分钟才能阅读完成。

本篇内容介绍了“Storm 的优点有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

Storm 和 hadoop 的区别

数据来源:HADOOP 是 HDFS 上某个文件夹下的可能是成 TB 的数据,STORM 是实时新增的某一笔数据

处理过程:HADOOP 是分 MAP 阶段到 REDUCE 阶段,STORM 是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源 (SPOUT) 或处理逻辑(BOLT)

是否结束:HADOOP 最后是要结束的,STORM 是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始

处理速度:HADOOP 是以处理 HDFS 上大量数据为目的,速度慢,STORM 是只要处理新增的某一笔数据即可可以做到很快。

适用场景:HADOOP 是在要处理一批数据时用的,不讲究时效性,要处理就提交一个 JOB,STORM 是要处理某一新增数据时用的,要讲时效性

与 MQ 对比:HADOOP 没有对比性,STORM 可以看作是有 N 个步骤,每个步骤处理完就向下一个 MQ 发送消息,监听这个 MQ 的消费者继续处理

好的编程模型让开发者专注于业务逻辑; 不好的编程模型让开发者把时间花费在通信,处理异常等琐事上.

编程模型例子:

用 hadoop 的 MapReduce 和 MPI 做一个对比,在 hadoop 的 MapReduce 里面呢,它的编程模型里面呢,map 和 reduce,你只用去写 map 和 reduce 函数,以及一些简单的驱动,程序就能跑起来,你不用关心 map 和数据是怎么切分的,map 和 reduce 是怎么传输的,reduce 的数据是怎么写到 hadoop 的 HDFS 里面的,这些你都不用关心,看起来写 mapreduce 就是单机的代码,没有什么分布式的特点在里面啊,但是它运行的分布式框架来帮你做上述这些东西。

反过来我们看写 MPI 的程序就完全不一样,写 MPI 的时候你就会很明显的感觉到你在写一个并行分布式程序,你需要在很多地方显式的去调数据传输的接口,你还要显式的去调一些数据同步的接口,这样才能把 MPI 程序显式的给 RUN 起来,这就是编程模型不同导致的不同的开发体验,其实这不仅仅是开发容易不容易的问题,更主要的是一个开发效率的问题,其实更简单的程序更能写出健壮的程序,写 mapreduce 程序是很简单的,但是要写出一个稳定靠谱的 MPI 程序就难一些

架构

Nimbus

Supervisor

Worker

编程模型:

DAG

Spout

Bolt

数据传输:

Zmq

Zmq 也是开源的消息传递的框架,虽然叫 mq,但它并不是一个 message queue,而是一个封装的比较好的

Netty

netty 是 NIO 的网络框架,效率比较高。之所以有 netty 是 storm 在 apache 之后呢,zmq 的 license 和 storm 的 license 不兼容的,bolt 处理完消息后会告诉 Spout。

高可用性

异常处理

消息可靠性保证机制

可维护性:

Storm 有个 UI 可以看跑在上面的程序监控

实时请求应答服务(同步),

实时请求应答服务(同步),往往不是一个很简单的操作,而且大量的操作,用 DAG 模型来提高请求处理速度

DRPC

实时请求处理

例子:发送图片,或者图片地址,进行图片特征的提取

这里 DRPC Server 的好处是什么呢?这样看起来就像是一个 Server,经过 Spout,然后经过 Bolt,不是更麻烦了吗?DRPC Server 其实适用于分布式,可以应用分布式处理这个单个请求,来加速处理的过程。

DRPCClientclient = new DRPCClient(drpc-host , 3772);
String result = client.execute( reach , http://twitter.com 
// 服务端由四部分组成:包括一个 DRPC Server,一个 DPRC Spout,一个 Topology 和一个 ReturnResult。

流式处理(异步)—  不是说不快,而是不是等待结果

逐条处理

例子:ETL,把关心的数据提取,标准格式入库,它的特点是我把数据给你了,不用再返回给我,这个是异步的

分析统计

例子:日志 PV,UV 统计,访问热点统计,这类数据之间是有关联的,比如按某些字段做聚合,加和,平均等等

最后写到 Redis,Hbase,MySQL,或者其他的 MQ 里面去给其他的系统去消费。

/**
 * ShuffleGrouping(spout)就是从 spout 来订阅数据,fieldGrouping(split , new Fields( word))
 *  实际上就是一个 hash,同一个词有相同的 hash,然后就会被 hash 到同一个 WordCount 的 bolt 里面,然后就
 *  可以进行计数。接下来两行呢是配置文件,然后是配置 3 个 worker,接下来是通过 Submitter 提交 Topology
 *  到 Storm 集群里面去。程序会编译打包,这段代码来自 storm 里面的 starter 的一段代码,这个代码怎么真正
 *  运行起来呢,就用 storm jar  然后 jar 包的名,然后就是类的名字,和 topology 的名字,因为这里有个 args[0]。 *  这段代码很简单,首先呢,第一部分构造了一个 DAG 的有向无环图,然后生成配置,提交到 Storm 集群去。 * */
public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(spout , new RandomSentenceSpout(), 5);
 builder.setBolt(split , new SplitSentence(), 8).shuffleGrouping( spout 
 builder.setBolt(count , new WordCount(), 12).fieldsGrouping(split , new Fields( word));
 Config conf = new Config();
 conf.setDebug(true);
 
 if(args != null   args.length   0) { conf.setNumWorkers(3);
 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
 }
 }
###Linux: storm jar storm-starter-0.9.4.jar storm.starter.WordCountTopology wordcount

 

Cluster Summary(整个集群的)

一个 slot 就是一个 worker,一个 worker 里面是一个 jvm,一个 worker 里面呢可以有多个 executor,每一个 executor 就是执行线程,每一个 executor 上面执行一个或多个 Task,一般来说默认是一个 task。

Topology Summary(每个应用程序的)

一个应用程序就是一个 Topology,它有名字,还有 ID,然后有个状态,ACTIVE 就是正在运行,KILLED 就是已经被杀掉了。

Topology actions 就是可以对 Topology 采取一些操作,Deactivate 就是暂停,Rebalance 就是重新做一下 balance,然后 kill 就是杀掉这个应用。

这个应用运行的到底怎么样呢,在 Topology stats 里面有个整体的统计,有 10 分钟,3 小时,1 天,还有所有的统计,这里面比较关键的呢,是 Complete latency,它的意思就是一条数据从发出去到处理完花了多长时间,第二个比较关键的呢就是 ACK,这个反映的是吞吐,前面的 Complete latency 反映的延迟。

在 Spouts 的统计信息里面呢,一个是 spout 的名字,和代码里面是对应的,第二个呢是这个 spout 它有多少个 executor,然后呢它有多少个 task,然后呢是它在一定时间内往外 emit 出多少数据,真正 tranfer 传输了多少数据,然后它 latency 延迟是多少,然后 ACK 处理了多少数据,后面还有错误的信息。

Bolt 也类似,通过这个 UI 页面可以实时观看这些统计信息,是非常有用的,可以知道哪个环节比较慢,哪些地方有没有什么瓶颈了,有瓶颈了是不是加一个并发来解决问题。

Spout 中这里最关键的是一个 nextTuple(),它是从外部取数据的源头,可以从 DPRC 取数据,可以从 MQ,比如 Kafka 中取数据,然后给后面的 bolt 进行处理,然后这里 wordcount 没有那么复杂,就自己随机的生成了数据。

_collector.emit(new Values(sentence), new Object());

这个代码后面 new Object()等于是随机的生成了一个 message 的 ID,这个 ID 有什么用,后面会讲到,实际上它是消息可靠性保障的一部分。有了这个 ID 呢 Storm 就可以帮你去跟踪这条消息到底有没有被处理完,如果处理完了呢?

如果处理完了,它就是调用一个 ack 告诉 spout,我已经处理完了,这里 ack 方法里面仅仅是把 id 打印出来,因为这里 id 没有什么意义,仅仅是为了展示,相反,如果在一定时间内没有处理完,会调用 fail 告诉说消息处理失败了。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 *  License  you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an  AS IS  BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package storm.starter.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class RandomSentenceSpout extends BaseRichSpout {
 SpoutOutputCollector _collector;
 Random _rand;

 @Override  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {  _collector = collector;  _rand = new Random();  }
 Utils.sleep(100);  String[] sentences = new String[]{  the cow jumped over the moon ,  an apple a day keeps the doctor away ,   four score and seven years ago ,  snow white and the seven dwarfs ,  i am at two with nature  };  String sentence = sentences[_rand.nextInt(sentences.length)];  _collector.emit(new Values(sentence), new Object());  }  @Override  public void ack(Object id) { System.out.println(id);  }  @Override  public void fail(Object id) { System.out.println(id);  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields( word));  }  @Override  public Map String, Object  getComponentConfiguration(){  return null;  } public static class WordCount1 extends BaseBasicBolt{  Map String, Integer  counts = new HashMap String, Integer  @Override  public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {  String word = tuple.getStringByField( word  Integer count = counts.get(word);  if(count==null){  count=0;  }  count++;  counts.put(word,count);  System.out.println(word++ +word+ ========= +count);  basicOutputCollector.emit(new Values(word,count));  }  @Override  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word , count));  }  }

 

对于 wordcount 的示例,它是有两个 blot,一个 bolt 是分词,一个 bolt 是计数,这里 SplitSentence 是展示它支持多语言的开发,其实这里代码调用的是 python 的 splitsentence.py,使用的是 ShellBolt 这个组件

那 wordcount 这个 bolt 是用 java 实现的,它的实现核心是亮点,一点是有 execute 这样一个函数,第二个是 declareOutputFields 这个函数,这两个函数的作用其实是很什么呢?最核心的其实是 execute,execute 的作用呢就是拿到输入的数据 Tuple,然后再 emit 数据出去。

以上就是在 storm 里面一个最简单的 wordcount 的例子,它的主函数的代码,它的提交的命令行代码,Spout 是什么样的,Bolt 是什么样的,提交到 Storm 集群之后是一个什么样的运行状况,在 WebUI 上面看到哪些核心的信息,这个在后面的应用开发里面都会大量的运用到。

Storm 与其他技术对比

Storm:进程、线程常驻运行,数据不进入磁盘,网络传递。

MapReduce:TB、PB 级别数据设计的,一次的批处理作业。

Storm:纯流式处理,处理数据单元是一个个 Tuple。另外 Storm 专门为流式处理设计,它的数据传输模式更为简单,很多地方也更为高效。并不是不能做批处理,它也可以来做微批处理,来提高吞吐。

Spark Streaming:微批处理,一个批处理怎么做流式处理呢,它基于内存和 DAG 可以把处理任务做的很快,把 RDD 做的很小来用小的批处理来接近流式处理。

和其它如 MPI 系统相比

通过对比,更能了解 Storm 的一些特点:

首先,相对于 Queue+Worker 来说,它是一个通用的分布式系统,分布式系统的一些细节可以屏蔽掉,比如说水平扩展,容错,上层应用只需要关注自己的业务逻辑就可以了,这一点对应应用开发人员来说是非常重要的,不然的话业务逻辑会被底层的一些细节所打乱。

另外一个,Storm 作为一个纯的流式处理系统,和 mapreduce 的差异相当大,一种称为流式处理,一种称为批处理,Storm 是一个常驻运行的,它的消息收发是很高效的。

和 spark 这种微批处理系统相比呢,Storm 可以处理单条单条的消息。

总的来说呢,Storm 在设计之初呢,就被定义为分布式的流式处理系统,所以说大部分的流式计算需求都可以通过 Storm 很好的满足,Storm 目前在稳定性方面也做的相当不错,对于实时流式计算来说是个非常不错的选择

“Storm 的优点有哪些”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计7603字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)