如何使用Spark Shell进行交互式分析

73次阅读
没有评论

共计 5000 个字符,预计需要花费 13 分钟才能阅读完成。

本文丸趣 TV 小编为大家详细介绍“如何使用 Spark Shell 进行交互式分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“如何使用 Spark Shell 进行交互式分析”文章能帮助大家解决疑惑,下面跟着丸趣 TV 小编的思路慢慢深入,一起来学习新知识吧。

基础

Spark shell 提供了一种来学习该 API 比较简单的方式, 以及一个强大的来分析数据交互的工具。在 Scala(运行于 Java 虚拟机之上, 并能很好的调用已存在的 Java 类库)或者 Python 中它是可用的。通过在 Spark 目录中运行以下的命令来启动它:

Scala

Python

./bin/spark-shell

Spark 的主要抽象是一个称为 Dataset 的分布式的 item 集合。Datasets 可以从 Hadoop 的 InputFormats(例如 HDFS 文件)或者通过其它的 Datasets 转换来创建。让我们从 Spark 源目录中的 README 文件来创建一个新的 Dataset:

scala  val textFile = spark.read.textFile(README.md)
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

您可以直接从 Dataset 中获取 values(值), 通过调用一些 actions(动作), 或者 transform(转换)Dataset 以获得一个新的。更多细节, 请参阅  API doc。

scala  textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala  textFile.first() // First item in this Dataset
res1: String = # Apache Spark

现在让我们 transform 这个 Dataset 以获得一个新的。我们调用  filter  以返回一个新的 Dataset, 它是文件中的 items 的一个子集。

scala  val linesWithSpark = textFile.filter(line =  line.contains( Spark))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

我们可以链式操作 transformation(转换)和 action(动作):

scala  textFile.filter(line =  line.contains( Spark)).count() // How many lines contain  Spark ?
res3: Long = 15

Dataset 上的更多操作

Dataset actions(操作)和 transformations(转换)可以用于更复杂的计算。例如, 统计出现次数最多的单词 :

Scala

Python

scala  textFile.map(line =  line.split(  ).size).reduce((a, b) =  if (a   b) a else b)
res4: Long = 15

第一个 map 操作创建一个新的 Dataset, 将一行数据 map 为一个整型值。在 Dataset 上调用  reduce  来找到最大的行计数。参数  map  与  reduce  是 Scala 函数(closures), 并且可以使用 Scala/Java 库的任何语言特性。例如, 我们可以很容易地调用函数声明, 我们将定义一个 max 函数来使代码更易于理解 :

scala  import java.lang.Math
import java.lang.Math
scala  textFile.map(line =  line.split(  ).size).reduce((a, b) =  Math.max(a, b))
res5: Int = 15

一种常见的数据流模式是被 Hadoop 所推广的 MapReduce。Spark 可以很容易实现 MapReduce:

scala  val wordCounts = textFile.flatMap(line =  line.split(  )).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

在这里, 我们调用了  flatMap  以 transform 一个 lines 的 Dataset 为一个 words 的 Dataset, 然后结合  groupByKey  和  count  来计算文件中每个单词的 counts 作为一个 (String, Long) 的 Dataset pairs。要在 shell 中收集 word counts, 我们可以调用  collect:

scala  wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

缓存

Spark 还支持 Pulling(拉取)数据集到一个群集范围的内存缓存中。例如当查询一个小的“hot”数据集或运行一个像 PageRANK 这样的迭代算法时, 在数据被重复访问时是非常高效的。举一个简单的例子, 让我们标记我们的  linesWithSpark  数据集到缓存中:

Scala

Python

scala  linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala  linesWithSpark.count()
res8: Long = 15
scala  linesWithSpark.count()
res9: Long = 15

使用 Spark 来探索和缓存一个 100 行的文本文件看起来比较愚蠢。有趣的是, 即使在他们跨越几十或者几百个节点时, 这些相同的函数也可以用于非常大的数据集。您也可以像   编程指南. 中描述的一样通过连接  bin/spark-shell  到集群中, 使用交互式的方式来做这件事情。

独立的应用

假设我们希望使用 Spark API 来创建一个独立的应用程序。我们在 Scala(SBT), Java(Maven)和 Python 中练习一个简单应用程序。

Scala

Java

Python

我们将在 Scala 中创建一个非常简单的 Spark 应用程序 – 很简单的, 事实上, 它名为  SimpleApp.scala:

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp { def main(args: Array[String]) {
 val logFile =  YOUR_SPARK_HOME/README.md  // Should be some file on your system
 val spark = SparkSession.builder.appName(Simple Application).getOrCreate()
 val logData = spark.read.textFile(logFile).cache()
 val numAs = logData.filter(line =  line.contains( a)).count()
 val numBs = logData.filter(line =  line.contains( b)).count()
 println(s Lines with a: $numAs, Lines with b: $numBs)
 spark.stop()
 }
}

注意, 这个应用程序我们应该定义一个  main()  方法而不是去扩展  scala.App。使用  scala.App  的子类可能不会正常运行。

该程序仅仅统计了 Spark README 文件中每一行包含‘a’的数量和包含‘b’的数量。注意, 您需要将 YOUR_SPARK_HOME 替换为您 Spark 安装的位置。不像先前使用 spark shell 操作的示例, 它们初始化了它们自己的 SparkContext, 我们初始化了一个 SparkContext 作为应用程序的一部分。

我们调用  SparkSession.builder  以构造一个 [[SparkSession]], 然后设置 application name(应用名称), 最终调用  getOrCreate  以获得 [[SparkSession]] 实例。

我们的应用依赖了 Spark API, 所以我们将包含一个名为  build.sbt  的 sbt 配置文件, 它描述了 Spark 的依赖。该文件也会添加一个 Spark 依赖的 repository:

name :=  Simple Project 
version :=  1.0 
scalaVersion :=  2.11.8 
libraryDependencies +=  org.apache.spark  %%  spark-sql  %  2.2.0

为了让 sbt 正常的运行, 我们需要根据经典的目录结构来布局  SimpleApp.scala  和  build.sbt  文件。在成功后, 我们可以创建一个包含应用程序代码的 JAR 包, 然后使用  spark-submit  脚本来运行我们的程序。

# Your directory layout should look like this
$ find .
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
 --class  SimpleApp  \
 --master local[4] \
 target/scala-2.11/simple-project_2.11-1.0.jar
Lines with a: 46, Lines with b: 23

快速跳转

恭喜您成功的运行了您的第一个 Spark 应用程序!

更多 API 的深入概述, 从  RDD programming guide  和  SQL programming guide  这里开始, 或者看看“编程指南”菜单中的其它组件。

为了在集群上运行应用程序, 请前往  deployment overview.

最后, 在 Spark 的  examples  目录中包含了一些 (Scala, Java, Python, R) 示例。您可以按照如下方式来运行它们:

#  针对  Scala  和  Java,  使用  run-example:
./bin/run-example SparkPi
#  针对  Python  示例,  直接使用  spark-submit:
./bin/spark-submit examples/src/main/python/pi.py
#  针对  R  示例,  直接使用  spark-submit:
./bin/spark-submit examples/src/main/r/dataframe.R

读到这里,这篇“如何使用 Spark Shell 进行交互式分析”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-04发表,共计5000字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)