共计 2761 个字符,预计需要花费 7 分钟才能阅读完成。
本篇内容介绍了“Spark2.1.0 怎么用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
运行 spark-shell
在《Spark2.1.0 之运行环境准备》一文曾经简单运行了 spark-shell,并用下图进行了展示(此处再次展示此图)。
图 1 执行 spark-shell 进入 Scala 命令行
图 1 中显示了很多信息,这里进行一些说明:
在安装完 Spark 2.1.0 后,如果没有明确指定 log4j 的配置,那么 Spark 会使用 core 模块的 org/apache/spark/ 目录下的 log4j-defaults.properties 作为 log4j 的默认配置。log4j-defaults.properties 指定的 Spark 日志级别为 WARN。用户可以到 Spark 安装目录的 conf 文件夹下从 log4j.properties.template 复制一份 log4j.properties 文件,并在其中增加自己想要的配置。
除了指定 log4j.properties 文件外,还可以在 spark-shell 命令行中通过 sc.setLogLevel(newLevel) 语句指定日志级别。
SparkContext 的 Web UI 的地址是:http://192.168.0.106:4040。192.168.0.106 是笔者安装 Spark 的机器的 ip 地址,4040 是 SparkContext 的 Web UI 的默认监听端口。
指定的部署模式(即 master)为 local[*]。当前应用(Application)的 ID 为 local-1497084620457。
可以在 spark-shell 命令行通过 sc 使用 SparkContext,通过 spark 使用 SparkSession。sc 和 spark 实际分别是 SparkContext 和 SparkSession 在 Spark REPL 中的变量名,具体细节已在《Spark2.1.0 之剖析 spark-shell》一文有过分析。
由于 Spark core 的默认日志级别是 WARN,所以看到的信息不是很多。现在我们将 Spark 安装目录的 conf 文件夹下的 log4j.properties.template 以如下命令复制出一份:
cp log4j.properties.template log4j.properties
cp log4j.properties.template log4j.properties
并将 log4j.properties 中的 log4j.logger.org.apache.spark.repl.Main=WARN 修改为 log4j.logger.org.apache.spark.repl.Main=INFO,然后我们再次运行 spark-shell,将打印出更丰富的信息,如图 2 所示。
图 2 Spark 启动过程打印的部分信息
从图 2 展示的启动日志中我们可以看到 SecurityManager、SparkEnv、BlockManagerMasterEndpoint、DiskBlockManager、MemoryStore、SparkUI、Executor、NettyBlockTransferService、BlockManager、BlockManagerMaster 等信息。它们是做什么的?刚刚接触 Spark 的读者只需要知道这些信息即可,具体内容将在后边的博文给出。
执行 word count
这一节,我们通过 word count 这个耳熟能详的例子来感受下 Spark 任务的执行过程。启动 spark-shell 后,会打开 Scala 命令行,然后按照以下步骤输入脚本:
步骤 1
输入 val lines =sc.textFile(../README.md , 2),以 Spark 安装目录下的 README.md 文件的内容作为 word count 例子的数据源,执行结果如图 3 所示。
图 3 步骤 1 执行结果
图 3 告诉我们 lines 的实际类型是 MapPartitionsRDD。
步骤 2
textFile 方法对文本文件是逐行读取的,我们需要输入 val words =lines.flatMap(line = line.split()),将每行文本按照空格分隔以得到每个单词,执行结果如图 4 所示。
图 4 步骤 2 执行结果
图 4 告诉我们 lines 在经过 flatMap 方法的转换后得到的 words 的实际类型也是 MapPartitionsRDD。
步骤 3
对于得到的每个单词,通过输入 val ones = words.map(w = (w,1)),将每个单词的计数初始化为 1,执行结果如图 5 所示。
图 5 步骤 3 执行结果
图 5 告诉我们 words 在经过 map 方法的转换后得到的 ones 的实际类型也是 MapPartitionsRDD。
步骤 4
输入 val counts = ones.reduceByKey(_ + _),对单词进行计数值的聚合,执行结果如图 6 所示。
图 6 步骤 4 执行结果
图 6 告诉我们 ones 在经过 reduceByKey 方法的转换后得到的 counts 的实际类型是 ShuffledRDD。
步骤 5
输入 counts.foreach(println),将每个单词的计数值打印出来,作业的执行过程如图 7 和图 8 所示。作业的输出结果如图 9 所示。
图 7 步骤 5 执行过程第一部分
图 8 步骤 5 执行过程第二部分
图 7 和图 8 展示了很多作业提交、执行的信息,这里挑选关键的内容进行介绍:
SparkContext 为提交的 Job 生成的 ID 是 0。
一共有四个 RDD,被划分为 ResultStage 和 ShuffleMapStage。ShuffleMapStage 的 ID 为 0,尝试号为 0。ResultStage 的 ID 为 1,尝试号也为 0。在 Spark 中,如果 Stage 没有执行完成,就会进行多次重试。Stage 无论是首次执行还是重试都被视为是一次 Stage 尝试(Stage Attempt),每次 Attempt 都有一个唯一的尝试号(AttemptNumber)。
由于 Job 有两个分区,所以 ShuffleMapStage 和 ResultStage 都有两个 Task 被提交。每个 Task 也会有多次尝试,因而也有属于 Task 的尝试号。从图中看出 ShuffleMapStage 中的两个 Task 和 ResultStage 中的两个 Task 的尝试号也都是 0。
HadoopRDD 则用于读取文件内容。
图 9 步骤 5 输出结果
图 9 展示了单词计数的输出结果和最后打印的任务结束的日志信息。
“Spark2.1.0 怎么用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!