共计 3111 个字符,预计需要花费 8 分钟才能阅读完成。
这期内容当中丸趣 TV 小编将会给大家带来有关 storm java 的编程思路是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
总体思路
storm 编程和 hadoop 的 mapreduce 的编程很类似,hadoop 的 mapreduce 需要自己实现 map 函数,reduce 函数,还有一个主类驱动;storm 需要自己实现 spout,bolt 和一个主函数。storm 编程为以下三步:
创建一个 Spout 读取数据
创建 bolt 处理数据
创建一个主类,在主类中创建拓扑和一个集群对象,将拓扑提交到集群
Topology 运行方式
Topology 的运行可以分为本地模式和分布式模式,模式的设置可以在配置文件中设定,也可以在代码中设置。本地模式其实什么都不需要安装,有 storm jar 包就够了
(1)本地运行的提交方式:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, conf, topology);
cluster.killTopology(topologyName);
cluster.shutdown();
(2)分布式提交方式:
StormSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
需要注意的是,在 Storm 代码编写完成之后,需要打包成 jar 包放到 Nimbus 中运行,打包的时候,不需要把依赖的 jar 都打进去,否则如果把依赖的 storm.jar 包打进去的话,运行时会出现重复的配置文件错误导致 Topology 无法运行。因为 Topology 运行之前,会加载本地的 storm.yaml 配置文件。
在 Nimbus 运行的命令如下:
storm jar StormTopology.jar maincalss args
Topology 运行流程
有几点需要说明的地方:
(1)Storm 提交后,会把代码首先存放到 Nimbus 节点的 inbox 目录下,之后,会把当前 Storm 运行的配置生成一个 stormconf.ser 文件放到 Nimbus 节点的 stormdist 目录中,在此目录中同时还有序列化之后的 Topology 代码文件;
(2) 在设定 Topology 所关联的 Spouts 和 Bolts 时,可以同时设置当前 Spout 和 Bolt 的 executor 数目和 task 数目,默认情况下,一个 Topology 的 task 的总和是和 executor 的总和一致的。之后,系统根据 worker 的数目,尽量平均的分配这些 task 的执行。worker 在哪个 supervisor 节点上运行是由 storm 本身决定的;
(3)任务分配好之后,Nimbes 节点会将任务的信息提交到 zookeeper 集群,同时在 zookeeper 集群中会有 workerbeats 节点,这里存储了当前 Topology 的所有 worker 进程的心跳信息;
(4)Supervisor 节点会不断的轮询 zookeeper 集群,在 zookeeper 的 assignments 节点中保存了所有 Topology 的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor 通过轮询此节点的内容,来领取自己的任务,启动 worker 进程运行;
(5)一个 Topology 运行之后,就会不断的通过 Spouts 来发送 Stream 流,通过 Bolts 来不断的处理接收到的 Stream 流,Stream 流是无界的。
最后一步会不间断的执行,除非手动结束 Topology。
Topology 方法调用流程
Topology 中的 Stream 处理时的方法调用过程如下:
有几点需要说明的地方:
(1)每个组件 (Spout 或者 Bolt) 的构造方法和 declareOutputFields 方法都只被调用一次。
(2)open 方法、prepare 方法的调用是多次的。入口函数中设定的 setSpout 或者 setBolt 里的并行度参数指的是 executor 的数目,是负责运行组件中的 task 的线程 的数目,此数目是多少,上述的两个方法就会被调用多少次,在每个 executor 运行的时候调用一次。相当于一个线程的构造方法。
(3)nextTuple 方法、execute 方法是一直被运行的,nextTuple 方法不断的发射 Tuple,Bolt 的 execute 不断的接收 Tuple 进行处理。只有这样不断地运行,才会产生无界的 Tuple 流,体现实时性。相当于线程的 run 方法。
(4)在提交了一个 topology 之后,Storm 就会创建 spout/bolt 实例并进行序列化。之后,将序列化的 component 发送给所有的任务所在的机器 (即 Supervisor 节点),在每一个任务上反序列化 component。
(5)Spout 和 Bolt 之间、Bolt 和 Bolt 之间的通信,是通过 zeroMQ 的消息队列实现的。
(6) 上图没有列出 ack 方法和 fail 方法,在一个 Tuple 被成功处理之后,需要调用 ack 方法来标记成功,否则调用 fail 方法标记失败,重新处理这个 Tuple。
Topology 并行度
在 Topology 的执行单元里,有几个和并行度相关的概念。
(1)worker: 每个 worker 都属于一个特定的 Topology,每个 Supervisor 节点的 worker 可以有多个,每个 worker 使用一个单独的端口,它对 Topology 中的每个 component 运行一个或者多个 executor 线程来提供 task 的运行服务。
(2)executor:executor 是产生于 worker 进程内部的线程,会执行同一个 component 的一个或者多个 task。
(3)task: 实际的数据处理由 task 完成,在 Topology 的生命周期中,每个组件的 task 数目是不会发生变化的,而 executor 的数目却不一定。executor 数目小于等于 task 的数目,默认情况下,二者是相等的。
在运行一个 Topology 时,可以根据具体的情况来设置不同数量的 worker、task、executor,而设置的位置也可以在多个地方。
(1)worker 设置:
(1.1) 可以通过设置 yaml 中的 topology.workers 属性
(1.2) 在代码中通过 Config 的 setNumWorkers 方法设定
(2)executor 设置:
通过在 Topology 的入口类中 setBolt、setSpout 方法的最后一个参数指定,不指定的话,默认为 1;
(3)task 设置:
(3.1) 默认情况下,和 executor 数目一致;
(3.2) 在代码中通过 TopologyBuilder 的 setNumTasks 方法设定具体某个组件的 task 数目;
终止 Topology
通过在 Nimbus 节点利用如下命令来终止一个 Topology 的运行:
storm kill topologyName
kill 之后,可以通过 UI 界面查看 topology 状态,会首先变成 KILLED 状态,在清理完本地目录和 zookeeper 集群中的和当前 Topology 相关的信息之后,此 Topology 就会彻底消失了。
Topology 跟踪
Topology 提交后,可以在 Nimbus 节点的 web 界面查看,默认的地址是 http://NimbusIp:8080。
上述就是丸趣 TV 小编为大家分享的 storm java 的编程思路是什么了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注丸趣 TV 行业资讯频道。