什么是 RDD
问题:从一个总计 100 行的文件中找出所有包含“包租婆”的行数 算法如下:
1. 读一行,判断这一行有“包租婆”吗?如果有,全局变量 count 加 1。2. 文件到末尾了吗?如果没有,跳转到第 1 步继续执行。3. 打印 count。
RDD 的概念:全称为 Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。
上述例子中,总计 100 行的文件就是一个 RDD,其中每一行表示一个 RDD 的元素
RDD 两大特性
1. 对集合的每个记录执行相同的操作
- 每一行都做“字符串”检查
- 检查本行是不是到了最后一行
2. 这个操作的具体行为是用户指定的
- 包含“包租婆”就为计数器做 + 1 操作
- 最后一行:结束;不是最后一行:进入下一行检查
RDD 有哪些操作参考资料
1. 创建 RDD
- 从文件中创建
val b = sc.textFile(README.md)
README.md 每一行都是 RDD 的一个元素
- 从普通数组创建 RDD
scala val a = sc.parallelize(1 to 9, 3)
里面包含了 1 到 9 这 9 个数字,它们分别在 3 个分区
2. map
map 是对 RDD 中的每个元素都执行一个指定的函数来产生一个新的 RDD。任何原 RDD 中的元素在新 RDD 中都有且只有一个元素与之对应。 - RDD a 中每个元素都比原来大一倍
scala val b = a.map(x = x*2)
scala b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
3. mapPartitions
mapPartitions 是 map 的一个变种。map 的输入函数是应用于 RDD 中每个元素,而 mapPartitions 的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的
- 函数 myfunc 是把分区中一个元素和它的下一个元素组成一个 Tuple
scala def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]()
var pre = iter.next while (iter.hasNext) {
val cur = iter.next;
res .::= (pre, cur) pre = cur;
scala a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
4. mapValues
mapValues 顾名思义就是输入函数应用于 RDD 中 Kev-Value 的 Value,原 RDD 中的 Key 保持不变,与新的 Value 一起组成新的 RDD 中的元素。因此,该函数只适用于元素为 KV 对的 RDD。_def mapPartitions[U: ClassTag](f: Iterator[T] = Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
f 即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以 Iterator[T] 传递给输入函数 f,f 的输出结果是 Iterator[U]。最终的 RDD 由所有分区经过输入函数处理后的结果合并起来的。_
- RDD b 的 key 是字符串长度,value 是当前元素值;对 b 进行 mapValues 操作,使得 value 首尾字符设为 x
scala val a = sc.parallelize(List( dog , tiger , lion , cat , panther , eagle), 2)
scala val b = a.map(x = (x.length, x))
scala b.mapValues(x + _ + x).collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))
5. mapWith
mapWith 是 map 的另外一个变种,map 只需要一个输入函数,而 mapWith 有两个输入函数。
spark 安装
- 资料
- 安装
wget http://apache.spinellicreations.com/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz
tar zxf spark-1.6.1-bin-hadoop2.6.tgz
mv spark-1.6.1-bin-hadoop2.6 spark
mv -f spark ~/app/
vi ~/.bash_profile
source ~/.bash_profile
- 启动 spark
进入 scala 命令行
- hello world
scala println(hello world)
hello world
spark IDE
下载并安装 JDK
下载并安装 IDEA
下载并安装 SCALA
准备好 spark 的 lib 包
添加 IDEA 的 SCALA 插件 File- Settings- Plugins- 搜索 Scala,并安装 Scala 插件
新建项目 File- New Project- 选择 Scala- next- project name location – Finish
添加 spark 的 lib 包“File”–“project structure”–“Libraries”,选择“+”,将 spark-hadoop 对应的包导入
新建 SparkPi 类(源码见 $SPARKHOME$/examples/src/main/scala/org/apache/spark/examples)新建包:org.apache.spark.examples 新建 Scala 类:SparkPi
// scalastyle:off println
package org.apache.spark.examples
import scala.math.random
import org.apache.spark._
/** Computes an approximation to pi */
object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Pi) // 本地运行加.setMaster(local)
val spark = new SparkContext(conf)
val slices = if (args.length 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y 1) 1 else 0
}.reduce(_ + _)
println(Pi is roughly + 4.0 * count / n)
// scalastyle:on println
上传至 linux 服务器,执行命令
$SPARK_HOME$/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://updev4:7077 /home/solr/sparkPi.jar
输出结果:Pi is roughly 3.13662
