Spark2.1.0怎么用

78次阅读
没有评论

共计 2761 个字符,预计需要花费 7 分钟才能阅读完成。

本篇内容介绍了“Spark2.1.0 怎么用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

运行 spark-shell

    在《Spark2.1.0 之运行环境准备》一文曾经简单运行了 spark-shell,并用下图进行了展示(此处再次展示此图)。

图 1   执行 spark-shell 进入 Scala 命令行

图 1 中显示了很多信息,这里进行一些说明:

在安装完 Spark 2.1.0 后,如果没有明确指定 log4j 的配置,那么 Spark 会使用 core 模块的 org/apache/spark/ 目录下的 log4j-defaults.properties 作为 log4j 的默认配置。log4j-defaults.properties 指定的 Spark 日志级别为 WARN。用户可以到 Spark 安装目录的 conf 文件夹下从 log4j.properties.template 复制一份 log4j.properties 文件,并在其中增加自己想要的配置。

除了指定 log4j.properties 文件外,还可以在 spark-shell 命令行中通过 sc.setLogLevel(newLevel) 语句指定日志级别。

SparkContext 的 Web UI 的地址是:http://192.168.0.106:4040。192.168.0.106 是笔者安装 Spark 的机器的 ip 地址,4040 是 SparkContext 的 Web UI 的默认监听端口。

指定的部署模式(即 master)为 local[*]。当前应用(Application)的 ID 为 local-1497084620457。

可以在 spark-shell 命令行通过 sc 使用 SparkContext,通过 spark 使用 SparkSession。sc 和 spark 实际分别是 SparkContext 和 SparkSession 在 Spark REPL 中的变量名,具体细节已在《Spark2.1.0 之剖析 spark-shell》一文有过分析。

  由于 Spark core 的默认日志级别是 WARN,所以看到的信息不是很多。现在我们将 Spark 安装目录的 conf 文件夹下的 log4j.properties.template 以如下命令复制出一份:

cp log4j.properties.template log4j.properties 

cp log4j.properties.template log4j.properties

并将 log4j.properties 中的 log4j.logger.org.apache.spark.repl.Main=WARN 修改为 log4j.logger.org.apache.spark.repl.Main=INFO,然后我们再次运行 spark-shell,将打印出更丰富的信息,如图 2 所示。

图 2  Spark 启动过程打印的部分信息

从图 2 展示的启动日志中我们可以看到 SecurityManager、SparkEnv、BlockManagerMasterEndpoint、DiskBlockManager、MemoryStore、SparkUI、Executor、NettyBlockTransferService、BlockManager、BlockManagerMaster 等信息。它们是做什么的?刚刚接触 Spark 的读者只需要知道这些信息即可,具体内容将在后边的博文给出。

执行 word count

  这一节,我们通过 word count 这个耳熟能详的例子来感受下 Spark 任务的执行过程。启动 spark-shell 后,会打开 Scala 命令行,然后按照以下步骤输入脚本:

步骤 1  

  输入 val lines =sc.textFile(../README.md , 2),以 Spark 安装目录下的 README.md 文件的内容作为 word count 例子的数据源,执行结果如图 3 所示。

图 3   步骤 1 执行结果

图 3 告诉我们 lines 的实际类型是 MapPartitionsRDD。

步骤 2

   textFile 方法对文本文件是逐行读取的,我们需要输入 val words =lines.flatMap(line = line.split()),将每行文本按照空格分隔以得到每个单词,执行结果如图 4 所示。

图 4   步骤 2 执行结果

图 4 告诉我们 lines 在经过 flatMap 方法的转换后得到的 words 的实际类型也是 MapPartitionsRDD。

步骤 3

      对于得到的每个单词,通过输入 val ones = words.map(w = (w,1)),将每个单词的计数初始化为 1,执行结果如图 5 所示。

图 5   步骤 3 执行结果

图 5 告诉我们 words 在经过 map 方法的转换后得到的 ones 的实际类型也是 MapPartitionsRDD。

步骤 4

  输入 val counts = ones.reduceByKey(_ + _),对单词进行计数值的聚合,执行结果如图 6 所示。

图 6   步骤 4 执行结果

图 6 告诉我们 ones 在经过 reduceByKey 方法的转换后得到的 counts 的实际类型是 ShuffledRDD。

步骤 5

  输入 counts.foreach(println),将每个单词的计数值打印出来,作业的执行过程如图 7 和图 8 所示。作业的输出结果如图 9 所示。

图 7   步骤 5 执行过程第一部分

图 8   步骤 5 执行过程第二部分

图 7 和图 8 展示了很多作业提交、执行的信息,这里挑选关键的内容进行介绍:

SparkContext 为提交的 Job 生成的 ID 是 0。

一共有四个 RDD,被划分为 ResultStage 和 ShuffleMapStage。ShuffleMapStage 的 ID 为 0,尝试号为 0。ResultStage 的 ID 为 1,尝试号也为 0。在 Spark 中,如果 Stage 没有执行完成,就会进行多次重试。Stage 无论是首次执行还是重试都被视为是一次 Stage 尝试(Stage Attempt),每次 Attempt 都有一个唯一的尝试号(AttemptNumber)。

由于 Job 有两个分区,所以 ShuffleMapStage 和 ResultStage 都有两个 Task 被提交。每个 Task 也会有多次尝试,因而也有属于 Task 的尝试号。从图中看出 ShuffleMapStage 中的两个 Task 和 ResultStage 中的两个 Task 的尝试号也都是 0。

HadoopRDD 则用于读取文件内容。

图 9   步骤 5 输出结果

图 9 展示了单词计数的输出结果和最后打印的任务结束的日志信息。

“Spark2.1.0 怎么用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-04发表,共计2761字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)