共计 12688 个字符,预计需要花费 32 分钟才能阅读完成。
本篇内容主要讲解“Spark Streaming 编程技巧是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“Spark Streaming 编程技巧是什么”吧!
#Spark Streaming 编程指南 #
## 概述 ## Spark Streaming 是核心 Spark API 的一个扩展,他可以实现高吞吐量,和容错的实时数据流处理。
他可以接受许多数据源例如 Kafka、Flume、Twitter、ZeroMQ 或者普通的老的 TCP 套接字的数据。数据可以使用拥有高级函数例如 map、reduce、join、和 window 的复杂算法表达式进行处理。最终,处理的数据可以被推送到文件系统、数据库和在线仪表盘。实际上,你可以在数据流上应用 Spark 内置的机器学习算法和图处理算法。
img src= https://cache.yisu.com/upload/information/20210522/355/658120.png /
在内部,它的工作原理如下。Spark Streaming 接收实时输入数据流,并且将数据分割成 batches,which are then processed by the Spark engine to generate the final stream of results in batches.
img src= https://cache.yisu.com/upload/information/20210522/355/658121.png /
Spark Streaming 提供一个高级的抽象叫做离散流,或者 DStream。它表示一个连续不断的数据流。DStreams 既可以通过来自数据源例如 Kafka、Flume 的数据数据流创建,也可以通过在其他 DStreams 上应用高级操作创建。在内部,一个 DStream 被表示成一个 RDDs 的序列。
本指南向你展示如何使用 DStreams 开始编写 Spark Streaming 程序。你可以使用 Scala 或 Java 编写 Spark Streaming 程序,本指南中两者都提供。你将会发现 tabs 贯穿全文,可以让你在 Scala 和 Java 代码片段中选择。
## 一个简单的例子 ## 在我们进入如何编写你自己的 Spark Streaming 程序的细节之前,让我们快速的看下一个简单的 Spark Streaming 程序是怎样的。比如说,我们想计算一个通过监听 TCP 套接字得到的数据服务器上的文本数据中单词的总数。所有你需要做的如下:
首先,我们创建一个 JavaStreamingContext 对象,他是所有 Streaming 功能的一个切入点。除了 Spark 的配置,we specify that any DStream would be processed in 1 second batches.
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a StreamingContext with a local master
JavaStreamingContext jssc = new JavaStreamingContext(local[2] , JavaNetworkWordCount , new Duration(1000))
使用这个 context,我们通过指定 IP 地址和数据服务器的端口来创建一个新的 DStream。
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
JavaReceiverInputDStream String lines = jssc.socketTextStream(localhost , 9999);
这个 DStream lines 表示数据的流将会从这个数据服务器接收。流中的每一条记录都是一行文本。然后,我们通过空格将行分割成单词。
// Split each line into words
JavaDStream String words = lines.flatMap( new FlatMapFunction String, String () { @Override public Iterable String call(String x) { return Arrays.asList(x.split( ));
}
});
flatMap 是一个 DStream 操作,它通过使源 DStream 中的每一条记录生成许多新的记录而创建一个新的 DStream。在这个例子中,每一行将会被分割成多个 words,words 流被表示成 words DStream。注意,我们定义使用 FlatMapFunction 对象转换。正如我们一直在探索,在 Java API 中有许多这样的转换类来帮助定义 DStream 转换。
接下俩,我们想要计算这些 words 的和:
// Count each word in each batch
JavaPairDStream String, Integer pairs = words.map( new PairFunction String, String, Integer () { @Override public Tuple2 String, Integer call(String s) throws Exception { return new Tuple2 String, Integer (s, 1);
}
});
JavaPairDStream String, Integer wordCounts = pairs.reduceByKey( new Function2 Integer, Integer, Integer () { @Override public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
wordCounts.print(); // Print a few of the counts to the console
使用一个 PairFunction,words DStream 被进一步 mapped(一对一转换)成一个 DStream 对 (word,1)。然后,使用 Function2 对象,it is reduced to get the frequency of words in each batch of data。最后,wordCounts.print() 将会每秒打印一些生成的和。
Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
完整的代码可以再 Spark Streaming example JavaNetworkWordCount 找到。
如果你已经下载并且构建了 Spark,你可以像下面这样运行这个例子。你需要首先运行 Netcat(一个可以再大多数 Unix-like 系统上找到的小工具)作为一个数据服务器,通过:
$ nc -lk 9999
然后,在一个不同的终端下,亦可以启动这个例子,通过:
$ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999
然后,在运行 netcat 服务的终端中输入的每一行将会被求和并且每秒打印在屏幕上。他看起来像这样:
pre # TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world … # TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount $ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 … ——————————————- Time: 1357008430000 ms ——————————————- (hello,1) (world,1) … /pre
你也可以在 Spark shell 直接使用 Spark Streaming:
$ bin/spark-shell
… 并且通过封装已存在的交互式 shell SparkContext 对象 sc 来创建你的 StreamingContext:
val ssc = new StreamingContext(sc, Seconds(1))
When working with the shell, you may also need to send a ^D to your netcat session to force the pipeline to print the word counts to the console at the sink.
## 基础知识 ## 现在,我们 move beyond the simple example,我们详细阐述编写一个 streaming 应用程序你需要了解的 Spark Streaming 的基础知识。
### 接入 ### 要编写你自己的 Spark Streaming 程序,你将需要添加下面的依赖到你的 SBT 或者 Maven 项目中:
groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 1.0.2
对于从像 Kafka 和 Flume 这样的数据源获取数据的功能,现在已经出现在 Spark Streaming 核心 API 里。你将需要添加相应的 attiface spark-streaming-xyz_2.10 到依赖。例如,下面是一些常见的:
pre Source Artifact Kafka spark-streaming-kafka_2.10 Flume spark-streaming-flume_2.10 Twitter spark-streaming-twitter_2.10 ZeroMQ spark-streaming-zeromq_2.10 MQTT spark-streaming-mqtt_2.10 /pre
罪行的列表,请参考 Apache repository 获得所有支持的源和 artifacts 的列表。
### 初始化 ### 在 Java 中,要初始化一个 Spark Streaming 程序,需要创建一个 JavaStreamingContext 对象,他是整个 Spark Streaming 功能的切入点。一个 JavaStreamingContext 对象可以被创建,使用:
new JavaStreamingContext(master, appName, batchInterval, [sparkHome], [jars])
master 参数是一个标准的 Spark 集群 URL,并且可以是“local”作为本地测试。appName 是你的程序的名字,它将会在你的集群的 Web UI中显示。batchInterval 是 batches 的大小,就像之前解释的。最后,如果运行为分布式模式,需要最后两个参数来部署你的代码到一个集群上,就像 Spark programming guide 描述的那样。此外,基本的 SparkContext 可以如同 ssc.sparkContext 这样访问。
batch internal 的设置必须根据你的应用程序的延迟要求和可用的集群资源。查看 Performance Tuning 获得更对详细信息。
###DStreams### Discretized Stream 或者说 DStream,是 Spark Streaming 提供的基本的抽象。它表示连续不断的数据流,或者来自数据源的输入数据流,或者通过转换输入流生成的经过处理的数据流。在内部,它通过一个连续不断的 RDDs 的序列表示,他是 Spark 的一个不可变得抽象,分布式数据器。Each RDD in a DStream contains data from a certain interval,就像下面的图表中展示的:
img src= https://cache.yisu.com/upload/information/20210522/355/658122.png /
应用在一个 DStream 上的任何操作转换成在基础的 RDDs 上面的操作。例如,in the earlier example of converting a stream of lines to words, the flatmap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream. 下面的图表展示了这个:
img src= https://cache.yisu.com/upload/information/20210522/355/658124.png /
这些基础的 RDD 转换是通过 Spark 引擎计算的。DStream 操作隐藏了大多数的细节并提供开发者方便的高级 API。这些操作在后面的章节中有详细讨论。
### 输入源 ### 我们已经在 [quick example](quick example) 看了 ssc.socketTextStream(…), 它通过一个 TCP 套接字连接接受文本数据创建了一个 DStream。除了套接字,核心 Spark Streaming API 提供了创建 DStream 通过文件,和将 Akka actors 作为输入源。
特别的,对于文件,DStream 可以这样创建:
jssc.fileStream(dataDirectory);
Spark Streaming 将会监视 dataDirectory 目录下的任何 Hadoop 兼容的文件系统,并且处理这个目录下创建的任何文件。
注意:
文件必须有统一的格式
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved the files must not be changed.
For more details on streams from files, Akka actors and sockets, see the API documentations of the relevant functions in StreamingContext for Scala and JavaStreamingContext for Java.
此外,通过源,例如 Kafka、Flume 和 Twitter 创建 DStream 的功能可以通过导入并添加正确的依赖,就像前面的章节中解释的那样。在 Kafka 的情况下,在添加 artifact spark-streaming-kafka_2.10 到项目的依赖后,你可以像这样创建一个来自 Kafka 的 DStream:
import org.apache.spark.streaming.kafka.*;
KafkaUtils.createStream(jssc, kafkaParams, ...);
更多关于附加源的细节,查看相应的 API 文档,此外,你可以实现你自己的源的定制接收者,查看 Custom Receiver Guide.
### 操作 ### 有两种 DStream 操作 - 转换和输出操作。和 RDD 转换类似,DStream 转换操作针对一个或者多个 DStream 来创建新的包含转换数据的 DStreams。在数据流上应用一系列转换后,输入操作需要调用,它写数据到一个额外的数据槽中,例如一个文件系统或者一个数据库。
#### 转换 #### DStream 支持许多转换,在一个普通的 Spark RDD 上。下面是一些常见的转换:
pre Transformation Meaning map(func) Return a new DStream by passing each element of the source DStream through a function func. flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items. filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true. repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions. union(otherStream) Return a new DStream that contains the union of the elements in the source DStream and otherDStream. count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. join(otherStream, [numTasks]) When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. cogroup(otherStream, [numTasks]) When called on DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. transform(func) Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. updateStateByKey(func) Return a new state DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. /pre
最后两个转换值得再次解释。
####UpdateStateByKey 操作 #### updateStateByKey 允许你维护任意的状态,同时,可以持续不断的更新新信息。使用它,你需要下面两步:
定义状态 - 状态可以是任意数据类型
定义状态更新函数 - 指定一个函数,怎样从之前的状态和新的输入流的值中更新状态
让我们使用一个例子阐述。假设我们想维护一个连续的一个文本流中的单词出现的次数。这里,连续的和是这个 state,并且是一个 Integer,我们定义 update 函数,像这样:
import com.google.common.base.Optional;
Function2 List Integer , Optional Integer , Optional Integer updateFunction =
new Function2 List Integer , Optional Integer , Optional Integer () { @Override public Optional Integer call(List Integer values, Optional Integer state) {
Integer newSum = ... // add the new values with the previous running count to get the new count
return Optional.of(newSum);
}
};
下面的应用在一个包含 words 的 DStream 上(假设,Pairs DStream 包含(word,1)对在 quick example)
update 函数将会被每一个 word 调用,with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count. 完整的 Scala 代码,查看例子 StatefulNetworkWordCount.
####Transform 操作 ####
####Window 操作 #### 最后,Spark Streaming 还提供了 window 计算。
####Output 操作 #### 当一个输出操作被调用,它出发一个流计算,目前,定义了下面的输出操作:
pre Output Operation Meaning print() Prints first ten elements of every batch of data in a DStream on the driver. foreachRDD(func) The fundamental output operator. Applies a function, func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. saveAsObjectFiles(prefix, [suffix]) Save this DStream s contents as a SequenceFile of serialized objects. The file name at each batch interval is generated based on prefix and suffix: prefix-TIME_IN_MS[.suffix] . saveAsTextFiles(prefix, [suffix]) Save this DStream s contents as a text files. The file name at each batch interval is generated based on prefix and suffix: prefix-TIME_IN_MS[.suffix] . saveAsHadoopFiles(prefix, [suffix]) Save this DStream s contents as a Hadoop file. The file name at each batch interval is generated based on prefix and suffix: prefix-TIME_IN_MS[.suffix] . /pre
完整的 DStream 操作的列表可以在 API 文档得到。对于 Scala API,查看 DStream 和 PairDStreamFunctions, 对于 Java API,查看 JavaDStream 和 JavaPairDStream.
### 持久化 ### 类似于 RDDs,DStreams 同样允许开发者持久化流数据到内存。就是说,在一个 DStream 上使用 persist()将会自动的持久化这个 DStream 的每一个 RDD 到内存。如果这个 DStream 中的数据将会被计算多次(例如,在同样的数据上进行多个操作),这是非常有用的。对于基于 window 的操作例如 reduceByWondow 和 reduceByKeyAndWindow 和基于状态的操作例如 updateStateByKey,是默认持久化的。因此,通过基于 window 的操作生成的 DStream 是自动持久化到内存的,而不需要开发者去调用 persist()方法。
对于数据流来说,它通过 network(例如 Kafka,Flume,socket 等等)接收数据,它的默认的持久化级别是复制数据到两个节点,以便容错。
注意,不想 RDDs,DSteam 默认的持久化级别是保持数据在内存中序列化。在章节 Performance Tuning 有更多的讨论。更多关于不同持久化级别的信息可以在 Spark Programming Guide 找到。
###RDD Checkpointing### 一个 stateful 操作是那些在数据的多个 batches 上的操作。它包括所有基于 window 的操作和 updateStateByKey 操作。由于 stateful 操作依赖于之前数据的 batches,他们随着时间连续不断的聚集元数据。要清除这些数据,Spark Streaming 支持在存储中间数据到 HDFS 时进行定期的 checkpointing。
启用 checkpointing,开发者需要提供 RDD 将被保存的 HDFS 路径。通过以下代码完成:
ssc.checkpoint(hdfsPath) // assuming ssc is the StreamingContext or JavaStreamingContext
一个 DStream 的 checkpointing 的间隔可以这样设置:
dstream.checkpoint(checkpointInterval)
对于 DStream,他必须被 checkpointing(即,DStream 通过 updateStateByKey 创建,并且使用相反的函数 reduceByKeyAndWindow),DStream 的 checkpoint 间隔默认设置为 set to a multiple of the DStream’s sliding interval,例如至少设置 10 秒。
###Deployment### 和其他任何 Spark 应用程序一样,Spark Streaming 应用程序部署在集群上。请参考 deployment guide 获得更多信息。
如果一个正在运行的 Spark Streaming 应用程序需要升级(包括新的应用代码),这里有两个可能的技巧:
The upgraded Spark Streaming application is started and run in parallel to the existing application. Once the new one (receiving the same data as the old one) has been warmed up and ready for prime time, the old one be can be brought down. Note that this can be done for data sources that support sending the data to two destinations (i.e., the earlier and upgraded applications).
The existing application is shutdown gracefully (see StreamingContext.stop(…) or JavaStreamingContext.stop(…) for graceful shutdown options) which ensure data that have been received is completely processed before shutdown. Then the upgraded application can be started, which will start processing from the same point where the earlier application left off. Note that this can be done only with input sources that support source-side buffering (like Kafka, and Flume) as data needs to be buffered while the previous application down and the upgraded application is not yet up.
到此,相信大家对“Spark Streaming 编程技巧是什么”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!