Spark Streaming 编程权威使用指南

这篇具有很好参考价值的文章主要介绍了Spark Streaming 编程权威使用指南。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark Streaming 编程权威使用指南


注意:本文档为Spark的旧版本Streaming引擎。Spark Streaming 不再更新,是一个遗留项目。在Spark中有一种新的、更易用的流处理引擎,称为结构化流式处理。您应该使用Spark结构化流处理来开发流式应用和流水线。请参阅结构化流式处理编程指南。

概述

Spark Streaming 是 Spark 核心 API 的扩展,支持可扩展、高吞吐量、容错的实时数据流处理。可以从多个来源(如Kafka、Kinesis或TCP sockets)摄取数据,并使用高级函数(如map、reduce、join和window)执行复杂算法进行处理。最后,处理后的数据可以推送到文件系统、数据库和实时仪表板。事实上,可以在数据流上应用 Spark 的机器学习和图处理算法。

Spark Streaming 的内部工作原理如下。它接收实时输入数据流并将数据划分为批次,然后由 Spark 引擎处理生成最终结果流的批次。

Spark Streaming 提供了一个称为离散化流(Discretized Stream)或 DStream 的高级抽象,它表示连续的数据流。DStream 可以通过从诸如 Kafka 和 Kinesis 等源创建输入数据流,或者通过对其他 DStream 应用高级操作来创建。在内部,DStream 被表示为 RDD(弹性分布式数据集)的序列。

本指南向您展示如何使用 DStream 编写 Spark Streaming 程序。本指南介绍了在Scala、Java或Python(从Spark 1.2开始引入)中编写 Spark Streaming 程序的方法。在本指南的各个部分中,您将找到可以选择不同语言的代码片段的选项卡。

注意:在 Python 中有一些与其他语言不同或不可用的 API。在本指南中,您将找到标记为 Python API 的标签来突出显示这些差异。

快速示例

在我们深入研究如何编写自己的 Spark Streaming 程序之前,让我们快速看一下一个简单的 Spark Streaming 程序是什么样子的。假设我们想要计算从监听 TCP socket 上接收的文本数据中的单词数量。您只需要执行以下操作。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# 创建具有两个工作线程和批处理间隔为1秒的本地StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# 创建将连接到 hostname:port(例如localhost:9999)的DStream
lines = ssc.socketTextStream("localhost", 9999)

# 将每行拆分为单词
words = lines.flatMap(lambda line: line.split(" "))

# 计算每批次中的每个单词的数量
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)

# 将每个RDD生成的前十个元素打印到控制台
wordCounts.pprint()

# 开始计算
ssc.start()
# 等待计算结束
ssc.awaitTermination()

完整的代码可以在 Spark Streaming 示例 NetworkWordCount 中找到。

如果您已经下载并构建了 Spark,可以按照以下方式运行此示例。首先,您需要使用 Netcat(大多数类Unix系统中都有的小工具)作为数据服务器运行:

$ nc -lk 9999

然后,在另一个终端中,您可以使用以下命令启动示例:

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999

然后,您在运行 netcat 服务器的终端中键入的任何行都将被计数并每秒打印在屏幕上。效果如下:

-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...

基本概念

接下来,我们将超越简单示例,详细介绍 Spark Streaming 的基本知识。

链接

与 Spark 类似,Spark Streaming 可通过 Maven Central 获得。要编写自己的 Spark Streaming 程序,您需要将以下依赖项添加到 SBT 或 Maven 项目中。

Maven:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.5.0</version>
    <scope>provided</scope>
</dependency>

对于像 Kafka 和 Kinesis 这样的不在 Spark Streaming 核心 API 中的数据源,您需要将相应的 artifact spark-streaming-xyz_2.12 添加到依赖项中。例如,一些常见的数据源如下:

数据源 Artifact
Kafka spark-streaming-kafka-0-10_2.12
Kinesis spark-streaming-kinesis-asl_2.12(Amazon Software License)
有关最新列表,请参阅 Maven 存储库以获取支持的所有数据源和 artifact 的完整列表。

初始化 StreamingContext

要初始化 Spark Streaming 程序,必须创建一个 StreamingContext 对象,该对象是所有 Spark Streaming 功能的主要入口点。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)

appName 参数是用于在集群 UI 上显示的应用程序名称。master 是 Spark、Mesos 或 YARN 集群的 URL,或者是一个特殊的 “local[]" 字符串,表示在本地模式下运行。实际上,在集群上运行时,您不会想在程序中硬编码 master,而是使用 spark-submit 在其中启动应用程序。然而,在本地测试和单元测试中,可以传递 "local[]” 以在进程内运行 Spark Streaming(检测本地系统中的核心数)。

批处理间隔必须根据应用程序的延迟要求和可用集群资源进行设置。有关详细信息,请参见性能调整部分。

定义上下文后,您需要执行以下操作:

  • 通过创建输入 DStream 定义输入源。
  • 通过对 DStream 应用转换和输出操作来定义流计算。
  • 使用 streamingContext.start() 接收数据并进行处理。
  • 使用 streamingContext.awaitTermination() 等待处理结束(手动或由于任何错误)。
  • 使用 streamingContext.stop() 可以手动停止处理。

请记住:

  • 一旦上下文已启动,就无法向其设置或添加新的流计算。
  • 一旦上下文停止,就无法重新启动。
  • 一个 JVM 中只能同时存在一个 StreamingContext。
  • 调用 stop() 时会同时停止 SparkContext。如果只想停止 StreamingContext,请将 stopSparkContext 参数设置为 false。
  • 可以重复使用 SparkContext 来创建多个 StreamingContext,只要在创建下一个 StreamingContext 之前停止上一个 StreamingContext(而不是停止 SparkContext)即可。

离散化流(DStreams)

离散化流(Discretized Stream)或 DStream 是 Spark Streaming 提供的基本抽象。它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的处理后的数据流。在内部,DStream 由一系列 RDD(弹性分布式数据集)表示,RDD 是 Spark 对不可变分布式数据集的抽象(有关详细信息,请参阅 Spark 编程指南)。DStream 中的每个 RDD 包含来自特定间隔的数据,如下图所示。

Spark Streaming 编程权威使用指南,spark,大数据,流数据处理#大数据,spark,大数据,分布式

任何应用于 DStream 的操作都会转化为对底层 RDD 的操作。例如,在将行流转换为单词流的示例中,flatMap 操作将应用于 lines DStream 中的每个 RDD,以生成 words DStream 中的 RDD。如下图所示。

Spark Streaming 编程权威使用指南,spark,大数据,流数据处理#大数据,spark,大数据,分布式

这些底层 RDD 转换由 Spark 引擎计算。DStream 操作隐藏了大多数这些细节,并为开发人员提供了更高级别的 API 以提供方便。这些操作将在后面的章节中详细讨论。

输入 DStreams 和 Receivers

输入 DStream 是表示从流式源接收到的输入数据流的 DStream。在快速示例中,lines 是一个输入 DStream,因为它表示从 netcat 服务器接收到的数据流。除文件流之外,每个输入 DStream 都与一个 Receiver(Scala 文档,Java 文档)对象相关联,该对象从源接收数据并将其存储在 Spark 的内存中进行处理。

Spark Streaming 提供了两类内置的流式源。

基本源:直接在 StreamingContext API 中提供的源。例如:文件系统和 socket 连接。
高级源:通过额外的实用程序类提供的 Kafka、Kinesis 等源。这些需要链接到额外的依赖项,如链接部分所述。
我们将在本节后面讨论每个类别中存在的一些源。

请注意,如果要在流式应用程序中并行接收多个数据流,可以创建多个输入 DStream(在性能调整部分中进一步讨论)。这将创建多个接收器,它们将同时接收多个数据流。但请注意,Spark worker/executor 是一个长时间运行的任务,因此它占用了分配给 Spark Streaming 应用程序的一个核心。因此,重要的是要记住,Spark Streaming 应用程序需要分配足够的核心(或线程,如果在本地运行)来处理接收到的数据以及运行接收器。

基本源

我们已经看过了快速示例中的 ssc.socketTextStream(...),它使用文本数据从 TCP socket 连接创建了一个 DStream。除了 sockets,StreamingContext API 还提供了从文件作为输入源创建 DStream 的方法。

文件流

要从与 HDFS API 兼容的任何文件系统(如 HDFS、S3、NFS 等)中读取数据,可以通过 StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass] 创建一个 DStream。

文件流不需要运行接收器,因此不需要为接收文件数据分配任何核心。

对于简单的文本文件,最简单的方法是使用 StreamingContext.textFileStream(dataDirectory)

# 使用 dataDirectory 创建 DStream,dataDirectory 是目录的路径,如 "hdfs://namenode:8040/logs/"
lines = ssc.textFileStream(dataDirectory)

如何监视目录:

  • Spark Streaming 将监视 dataDirectory 目录并处理在该目录中创建的任何文件。
  • 可以监视简单的目录,例如 “hdfs://namenode:8040/logs/”。所有直接位于此路径下的文件都将在发现时被处理。
  • 可以提供 POSIX glob 模式,例如 “hdfs://namenode:8040/logs/2017/*”。在这种情况下,DStream 将包含与模式匹配的目录中的所有文件。也就是说:它是目录的模式,而不是目录中的文件的模式。
  • 所有文件必须具有相同的数据格式。
  • 文件被认为是属于一个时间段,基于其修改时间,而不是创建时间。
  • 处理后,当前窗口内的文件的更改不会导致重新读取文件。也就是说:更新将被忽略。
  • 目录中的文件越多,扫描更改所需的时间就越长 - 即使没有文件被修改。
  • 如果使用通配符标识目录(例如 “hdfs://namenode:8040/logs/2016-*”),将整个目录重命名以匹配路径将会将该目录添加到受监视的目录列表中。只有修改时间在当前窗口内的目录中的文件将包括在流中。
  • 调用 FileSystem.setTimes() 来修复时间戳是一种方式,可以使文件在稍后的窗口中被接收,即使其内容没有发生变化。

使用对象存储作为数据源

“完全”文件系统(如 HDFS)在创建输出流时往往会立即设置其文件的修改时间。当一个文件被打开时,即使数据尚未完全写入,它也可能被包括在 DStream 中 - 之后,在同一窗口内对文件进行更新将被忽略。也就是说:可能会错过更改,并且数据会从流中省略。

要确保更改在窗口中被捕获,请将文件写入未监视的目录,然后在关闭输出流后立即将其重命名为目标目录。只要在窗口的创建期间扫描的目标目录中出现重命名的文件,新数据就会被捕获。

相比之下,对象存储(如 Amazon S3 和 Azure 存储)通常具有较慢的重命名操作,因为数据实际上是复制的。此外,重命名的对象的修改时间可能是 rename() 操作的时间,因此可能不被视为原始创建时间所暗示的窗口的一部分。

需要对目标对象存储进行仔细测试,以验证其时间戳行为是否与 Spark Streaming 预期的一致。也许直接将文件写入目标目录是通过所选择的对象存储流式传输数据的适当策略。

有关此主题的更多详细信息,请参阅 Hadoop 文件系统规范。

基于自定义接收器的流

DStream 可以使用通过自定义接收器接收的数据流创建。有关详细信息,请参阅自定义接收器指南。

作为流的 RDD 队列

为了使用测试数据测试 Spark Streaming 应用程序,还可以基于 RDD 队列创建一个 DStream,使用 streamingContext.queueStream(queueOfRDDs)。推入队列的每个 RDD 都将被视为 DStream 中的数据批次,并像流一样进行处理。

有关来自 sockets 和 files 的流的更多详细信息,请参阅 Scala API 中的 StreamingContext、Java API 中的 JavaStreamingContext 和 Python API 中的 StreamingContext 的相关函数的 API 文档。

高级源

Python API:截至Spark 3.5.0,这些源中的 Kafka 和 Kinesis 在 Python API 中可用。

这类源需要与外部非Spark库进行交互,其中一些库具有复杂的依赖关系(如Kafka)。因此,为了最小化依赖项的版本冲突问题,从这些源创建 DStream 的功能已移动到可以在需要时显式链接的单独库中。

请注意,这些高级源在 Spark shell 中不可用,因此基于这些高级源的应用程序无法在 shell 中进行测试。如果您确实希望在 Spark shell 中使用它们,您将需要下载相应的 Maven artifact 的 JAR 文件以及其依赖项,并将其添加到类路径中。

以下是其中一些高级源:

  • Kafka:Spark Streaming 3.5.0 兼容 Kafka 版本 0.10 或更高版本。有关详细信息,请参阅 Kafka 集成指南。
  • Kinesis:Spark Streaming 3.5.0 兼容 Kinesis 客户端库 1.2.1。有关详细信息,请参阅 Kinesis 集成指南。

自定义源

Python API:在 Python 中尚不支持此功能。

输入 DStream 还可以通过自定义数据源创建。您只需要实现一个用户定义的接收器(有关详细信息,请参阅下一节),该接收器可以从自定义源接收数据并将其推送到 Spark。有关详细信息,请参阅自定义接收器指南。

接收器的可靠性

根据其可靠性,数据源可以分为两种类型。某些源(如 Kafka)允许确认传输的数据。如果接收来自这些可靠源的数据的系统正确地确认了接收到的数据,就可以确保不会由于任何故障而丢失数据。这导致了两种接收器:

  • 可靠接收器:可靠接收器在将数据接收并存储在具有复制的 Spark 中时,会向可靠源正确发送确认。
  • 不可靠接收器:不可靠接收器不向源发送确认。这可以用于不支持确认的源,甚至对于可靠源,当不想或不需要进入确认的复杂性时。

如何编写可靠接收器的详细信息,请参阅自定义接收器指南。

在 DStreams 上的转换操作

与 RDD 类似,转换操作允许修改输入 DStream 的数据。DStream 支持许多可用于常规 Spark RDD 的转换。其中一些常见的转换如下:

转换 含义
map(func) 通过将源 DStream 的每个元素传递给函数 func 来返回一个新的 DStream。
flatMap(func) 类似于 map,但每个输入项可以映射到 0 个或多个输出项。
filter(func) 返回一个新的 DStream,其中仅选择源 DStream 的 func 返回 true 的记录。
repartition(numPartitions) 通过创建更多或更少的分区来更改此 DStream 中的并行级别。
union(otherStream) 返回一个新的 DStream,其中包含源 DStream 和 otherDStream 中元素的并集。
count() 通过计算源 DStream 中每个 RDD 中的元素数量返回一个新的单元素 RDD 的 DStream。
reduce(func) 通过使用给定的 reduce 函数 func(接受两个参数并返回一个)在源 DStream 的每个 RDD 中聚合元素,从而返回一个新的单元素 RDD 的 DStream。函数应该是可关联和可交换的,以便可以并行计算。
countByValue() 当在类型为 K 的 DStream 上调用时,返回一个新的 (K, Long) 对的 DStream,其中每个键的值是其在源 DStream 的每个 RDD 中的频率。
reduceByKey(func, [numTasks]) 在类型为 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对的 DStream,其中每个键的值都使用给定的 reduce 函数进行聚合。注意:默认情况下,这会使用 Spark 的默认并行任务数(对于本地模式为 2,在集群模式中,该数字由配置属性 spark.default.parallelism 确定)进行分组。您可以传递一个可选的 numTasks 参数来设置不同的任务数。
join(otherStream, [numTasks]) 在类型为 (K, V) 和 (K, W) 对的两个 DStream 上调用时,返回一个新的 (K, (V, W)) 对的 DStream,其中每个键的所有元素对都会合并。
cogroup(otherStream, [numTasks]) 在类型为 (K, V) 和 (K, W) 对的 DStream 上调用时,返回一个新的 (K, Seq[V], Seq[W]) 元组的 DStream。
transform(func) 通过将 RDD 到 RDD 函数应用于源 DStream 的每个 RDD 来返回一个新的 DStream。这可用于在 DStream 上执行任意的 RDD 操作。
updateStateByKey(func) 返回一个新的“状态”DStream,其中每个键的状态通过应用给定函数 func 来更新键的先前状态和键的新值而更新。这可用于为每个键维护任意状态数据。

其中一些转换值得更详细地讨论。

updateStateByKey 操作

updateStateByKey 操作允许在不断更新的过程中维护任意状态。要使用此功能,需要执行两个步骤。

  1. 定义状态:状态可以是任意数据类型。
  2. 定义状态更新函数:使用一个函数来指定如何使用上一个状态和来自输入流的新值来更新状态。

在每个批次中,Spark 将对所有现有键应用状态更新函数,无论它们在批次中是否有新数据。如果更新函数返回 None,则键值对将被删除。

让我们通过一个示例来说明这个问题。假设您想要在文本数据流中维护每个单词的运行计数。这里,运行计数是状态,并且它是一个整数。我们将更新函数定义为:

def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  # 将新值与先前的运行计数相加以获得新的计数

这将应用于包含单词的 DStream(例如,在前面示例中包含 (word, 1) 对的 pairs DStream)。

runningCounts = pairs.updateStateByKey(updateFunction)

对于完整的 Python 代码,请参阅 stateful_network_wordcount.py 示例。

请注意,使用 updateStateByKey 需要配置检查点目录,在检查点部分中详细讨论了此问题。

transform 操作

transform 操作(以及 transformWith 等变体)允许在 DStream 上应用任意 RDD 到 RDD 函数。可以使用它来执行在 DStream API 中没有直接暴露的任何 RDD 操作。例如,在数据流中的每个批次与另一个数据集进行连接的功能并不直接暴露在 DStream API 中。但是,您可以轻松地使用 transform 来实现这一点。这提供了非常强大的可能性。例如,可以通过将输入数据流与预先计算的垃圾信息(可能也是使用 Spark 生成的)进行连接,然后基于此进行过滤来实现实时数据清理。

spamInfoRDD = sc.pickleFile(...)  # 包含垃圾信息的 RDD

# 将数据流与垃圾信息连接以进行数据清理
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))

请注意,提供的函数在每个批次间隔中被调用。这使您可以执行时间变化的 RDD 操作,即在批次之间可以更改 RDD 操作、分区数、广播变量等。

窗口操作

Spark Streaming 还提供了窗口计算,允许您在滑动窗口的数据上应用转换。下图展示了这个滑动窗口。

Spark Streaming 编程权威使用指南,spark,大数据,流数据处理#大数据,spark,大数据,分布式

如图所示,每当窗口在源 DStream 上滑动时,落在窗口内的源 RDD 被合并并操作以生成窗口化 DStream 的 RDD。在这种特定情况下,该操作应用于最后 3 个时间单位的数据,并且每 2 个时间单位滑动一次。这表明任何窗口操作都需要指定两个参数。

  • 窗口长度(window length):窗口的持续时间(在图中为 3)。
  • 滑动间隔(sliding interval):在窗口操作被执行的间隔(在图中为 2)。

这两个参数必须是源 DStream 的批处理间隔的倍数(在图中为 1)。

让我们通过一个示例来说明窗口操作。假设您想要通过对过去 30 秒的数据应用 reduceByKey 操作,每隔 10 秒计算一次单词计数。为此,我们需要使用 reduceByKeyAndWindow 操作在包含 (word, 1) 对的 pairs DStream 上对过去 30 秒的数据进行操作。

# 过去 30 秒的数据,每 10 秒减少一次
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)

一些常见的窗口操作如下。所有这些操作都采用上述两个参数 - 窗口长度和滑动间隔。

转换 含义
window(windowLength, slideInterval) 返回基于源 DStream 的窗口化批次计算而得到的新 DStream。
countByWindow(windowLength, slideInterval) 返回流中元素的滑动窗口计数。
reduceByWindow(func, windowLength, slideInterval) 返回一个新的单元素流,通过使用 func 在滑动间隔内聚合流中的元素来创建。该函数应具有关联性和可交换性,以便可以并行计算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 在类型为 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对的 DStream,其中每个键的值使用给定的 reduce 函数 func 在滑动窗口中的批次上进行聚合。注意:默认情况下,这会使用 Spark 的默认并行任务数(对于本地模式为 2,在集群模式中,该数字由配置属性 spark.default.parallelism 确定)进行分组。您可以传递一个可选的 numTasks 参数来设置不同的任务数。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 上述 reduceByKeyAndWindow() 的更高效版本,其中通过减少进入滑动窗口的新数据来增量计算每个窗口的缩减值,并通过“反向减少”离开窗口的旧数据。一个示例是在窗口滑动时“添加”和“减去”键的计数。但是,它仅适用于“可逆减少函数”,即那些具有相应的“反向减少”函数(以 invFunc 作为参数)。与 reduceByKeyAndWindow 类似,默认情况下,通过可选参数可以配置减少任务的数量。请注意,必须启用检查点才能使用此操作。
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 在类型为 (K, V) 对的 DStream 上调用时,返回一个新的 (K, Long) 对的 DStream,其中每个键的值是其在滑动窗口内的频率。与 reduceByKeyAndWindow 类似,通过可选参数可以配置减少任务的数量。

Join操作

最后,值得强调的是在Spark Streaming中如何轻松地执行不同类型的join操作。

流-流(join)

流可以非常容易地与其他流进行join操作。

stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)

在每个批次间隔中,由stream1生成的RDD将与由stream2生成的RDD进行join。你还可以执行leftOuterJoin、rightOuterJoin和fullOuterJoin。此外,对流进行窗口join也很有用,这也非常简单。

windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)

流-数据集(join)

在前面的DStream.transform操作中已经展示过这个操作。这里是一个将窗口流与数据集进行join的例子。

dataset = ...  # 一些RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))

事实上,你还可以动态地更改要进行join的数据集。传递给transform函数的函数会在每个批次间隔中评估一次,并且会使用数据集引用指向的当前数据集。

完整的DStream转换列表可以在API文档中找到。对于Scala API,请参见DStream和PairDStreamFunctions。对于Java API,请参见JavaDStream和JavaPairDStream。对于Python API,请参见DStream。

DStreams的输出操作

输出操作允许将DStream的数据推送到外部系统,例如数据库或文件系统。由于输出操作实际上允许外部系统消费转换后的数据(类似于RDD的操作),它们触发所有DStream转换的实际执行(类似于RDD的动作)。目前定义了以下输出操作:

输出操作 含义
print() 在运行流应用程序的驱动节点上打印每个批次数据的前十个元素。这对于开发和调试非常有用。
saveAsTextFiles(prefix, [suffix]) 将DStream的内容保存为文本文件。每个批次间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS[.suffix]”。
saveAsObjectFiles(prefix, [suffix]) 将DStream的内容保存为序列化的Java对象的SequenceFiles。每个批次间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS[.suffix]”。
saveAsHadoopFiles(prefix, [suffix]) 将DStream的内容保存为Hadoop文件。每个批次间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS[.suffix]”。
foreachRDD(func) 最通用的输出操作,对从流生成的每个RDD应用一个函数func。此函数应该将每个RDD中的数据推送到外部系统,例如将RDD保存到文件中,或将其通过网络写入数据库。请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中有RDD操作,以强制计算流RDDs。

使用foreachRDD的设计模式

dstream.foreachRDD是一个非常强大的原语,允许将数据发送到外部系统。然而,正确和高效地使用该原语很重要。以下是一些需要避免的常见错误。

通常,将数据写入外部系统需要创建一个连接对象(例如,与远程服务器的TCP连接)并使用它将数据发送到远程系统。出于这个目的,开发人员可能会不经意地尝试在Spark驱动程序中创建连接对象,然后尝试在Spark工作节点上使用它来保存RDD中的记录。例如(在Scala中):

def sendRecord(rdd):
    connection = createNewConnection()  # 在驱动程序中执行
    rdd.foreach(lambda record: connection.send(record))
    connection.close()

dstream.foreachRDD(sendRecord)

这是不正确的,因为这要求将连接对象序列化并从驱动程序发送到工作节点。这样的连接对象很少能够跨机器传输。这个错误可能会表现为序列化错误(连接对象不可序列化),初始化错误(连接对象需要在工作节点上初始化),等等。正确的解决方案是在工作节点上创建连接对象。

然而,这可能导致另一个常见错误 - 为每条记录创建一个新的连接。例如:

def sendRecord(record):
    connection = createNewConnection()
    connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))

通常,创建连接对象需要时间和资源开销。因此,为每条记录创建和销毁连接对象可能会导致不必要的高开销,并且可能会显著降低系统的整体吞吐量。更好的解决方案是使用rdd.foreachPartition - 创建一个单独的连接对象,并使用该连接对象发送RDD分区中的所有记录。

def sendPartition(iter):
    connection = createNewConnection()
    for record in iter:
        connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

这样可以将连接创建开销分摊到多个记录上。

最后,通过在多个RDD/批次之间重用连接对象,可以进一步优化性能。可以维护一个静态的连接对象池,当推送多个批次的RDD到外部系统时,可以重用该池中的连接对象,从而进一步减少开销。

def sendPartition(iter):
    # ConnectionPool是一个静态、延迟初始化的连接池
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # 返回到池中以供将来重用
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

请注意,连接池中的连接应该按需懒惰地创建,并在一段时间内不使用时超时。这样可以实现将数据最有效地发送到外部系统。

其他需要记住的事项:
DStreams由输出操作惰性执行,就像RDD由RDD动作惰性执行一样。特别是,DStream输出操作中的RDD动作会强制处理接收到的数据。因此,如果您的应用程序没有任何输出操作,或者具有类似dstream.foreachRDD()但其中没有任何RDD动作的输出操作,则不会执行任何操作。系统将只接收数据并丢弃它。

默认情况下,输出操作是逐个执行的。它们按照在应用程序中定义的顺序执行。

DataFrame和SQL操作

你可以在流数据上轻松使用DataFrame和SQL操作。你必须使用StreamingContext所使用的SparkContext创建一个SparkSession。此外,这必须以一种可以在驱动程序失败后重新启动的方式完成。通过创建一个惰性实例化的单例SparkSession实例来实现这一点。下面是一个示例,修改了之前的单词计数示例,使用DataFrame和SQL生成单词计数。每个RDD被转换为DataFrame,注册为临时表,然后使用SQL进行查询。

# 惰性实例化的全局SparkSession实例
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

...

# 在流程序中的DataFrame操作

words = ...  # 字符串的DStream

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # 获取SparkSession的单例实例
        spark = getSparkSessionInstance(rdd.context.getConf())

        # 将RDD[String]转换为RDD[Row]以创建DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # 使用DataFrame创建临时视图
        wordsDataFrame.createOrReplaceTempView("words")

        # 使用SQL对表进行单词计数并打印结果
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        wordCountsDataFrame.show()
    except:
        pass

words.foreachRDD(process)

你还可以在来自不同线程的表上运行SQL查询(即与正在运行的StreamingContext异步)。只需确保将StreamingContext设置为记住足够数量的流数据,以便查询可以运行。否则,StreamingContext将删除旧的流数据,然后查询可能无法完成。例如,如果要查询最后一个批次,但是查询需要5分钟才能完成,则调用streamingContext.remember(Minutes(5))(在Scala中),或者其他语言中的等效方式。

有关如何使用DataFrame运行SQL查询的更多信息,请参见DataFrame和SQL指南。

MLlib操作

你还可以轻松使用MLlib提供的机器学习算法。首先,有流式机器学习算法(例如,流式线性回归、流式KMeans等),它们可以同时从流式数据中学习和应用模型。除此之外,对于更大类别的机器学习算法,你可以离线学习一个学习模型(即使用历史数据),然后在线将该模型应用于流式数据。请参阅MLlib指南以获取更多详细信息。

缓存/持久化

与RDD一样,DStreams也允许开发人员将流的数据持久化在内存中。也就是说,在DStream上使用persist()方法将自动在内存中持久化该DStream的每个RDD。如果DStream中的数据将被多次计算(例如,对同一数据进行多次操作),这将非常有用。对于基于窗口的操作(如reduceByWindow和reduceByKeyAndWindow)和基于状态的操作(如updateStateByKey),这是隐式的。因此,通过窗口操作生成的DStreams会自动在内存中持久化,而无需开发人员调用persist()。

对于接收来自网络的输入流(如Kafka、sockets等),默认的持久化级别设置为将数据复制到两个节点以实现容错性。

请注意,与RDD不同,DStreams的默认持久化级别会将数据序列化存储在内存中。这在性能调优部分进行了进一步讨论。有关不同持久化级别的更多信息,请参见Spark编程指南。

检查点

流应用程序必须全天候运行,因此必须能够从与应用程序逻辑无关的故障中恢复(例如,系统故障、JVM崩溃等)。为了实现这一点,Spark Streaming需要将足够的信息检查点到容错存储系统中,以便在发生故障时可以从故障中恢复。有两种类型的数据被检查点保存。

元数据检查点 - 将定义流计算的信息保存到容错存储(如HDFS)中。这用于从运行流应用程序的节点失败中恢复(详细讨论请参见后面的章节)。元数据包括:

  • 配置 - 用于创建流应用程序的配置。
  • DStream操作 - 定义流应用程序的DStream操作集合。
  • 未完成的批次 - 批次的作业已排队但尚未完成。

数据检查点 - 将生成的RDD保存到可靠存储中。这对于一些状态转换是必要的,它们将数据跨多个批次进行组合。在这样的转换中,生成的RDD依赖于前几个批次的RDD,这导致依赖链的长度随时间的增加而不断增加。为了避免这种依赖链无限增长而导致恢复时间增加(与依赖链成比例),需要定期将状态转换的中间RDD检查点到可靠存储(如HDFS)中,以切断依赖链。

总之,元数据检查点主要用于从驱动程序失败中恢复,而数据或RDD检查点即使在基本功能中也是必需的,如果使用了状态转换。

何时启用检查点
对于具有以下任何要求的应用程序,必须启用检查点:

  • 使用状态转换 - 如果在应用程序中使用了updateStateByKey或reduceByKeyAndWindow(带有逆函数),则必须提供检查点目录,以便进行定期的RDD检查点。
  • 从运行应用程序的驱动程序故障中恢复 - 元数据检查点用于恢复进度信息。

请注意,可以在没有启用检查点的情况下运行简单的流应用程序。在这种情况下,从驱动程序故障中的恢复也将是部分的(某些接收但未处理的数据可能会丢失)。这通常是可以接受的,许多人以这种方式运行Spark Streaming应用程序。预计在将来改进对非Hadoop环境的支持。

如何配置检查点
可以通过将目录设置为容错、可靠的文件系统(如HDFS、S3等)中的目录来启用检查点。这可以通过使用streamingContext.checkpoint(checkpointDirectory)来实现。这将允许你使用前面提到的状态转换。此外,如果您希望使应用程序能够从驱动程序故障中恢复,您应该重新编写流应用程序以具有以下行为。

  • 首次启动程序时,它将创建一个新的StreamingContext,设置所有流,并调用start()。
  • 在发生故障后重新启动程序时,它将从检查点目录中的检查点数据重新创建一个StreamingContext。
# 创建和设置新的StreamingContext的函数
def functionToCreateContext():
    sc = SparkContext(...)  # 新的上下文
    ssc = StreamingContext(...)
    lines = ssc.socketTextStream(...)  # 创建DStreams
    ...
    ssc.checkpoint(checkpointDirectory)  # 设置检查点目录
    return ssc

# 从检查点数据获取StreamingContext或创建一个新的
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# 对无论是启动还是重新启动执行的上下文进行额外设置
context. ...

# 启动上下文
context.start()
context.awaitTermination()

如果checkpointDirectory存在,则上下文将从检查点数据重新创建。如果目录不存在(即第一次运行),则会调用functionToCreateContext函数来创建一个新的上下文并设置DStreams。参见Python示例recoverable_network_wordcount.py。该示例将网络数据的单词计数附加到文件中。

你还可以使用StreamingContext.getOrCreate(checkpointDirectory, None)显式创建从检查点数据中创建StreamingContext并启动计算的StreamingContext。

除了使用getOrCreate之外,还需要确保驱动程序进程在失败时自动重新启动。这只能通过用于运行应用程序的部署基础架构来完成。部署部分将对此进行进一步讨论。

请注意,RDD的检查点会产生保存到可靠存储中的成本。这可能导致那些需要进行检查点的批次的处理时间增加。因此,需要仔细设置检查点间隔。在小的批次大小(例如1秒)下,每个批次检查点可能会显著降低操作吞吐量。相反,检查点间隔过大会导致血缘和任务大小增长,可能会产生不利影响。对于需要进行RDD检查点的状态转换,建议的默认间隔是批次间隔的倍数,至少为10秒。可以使用dstream.checkpoint(checkpointInterval)设置它。通常,将DStream的滑动间隔的5 - 10个滑动间隔作为检查点间隔是一个不错的设置。

累加器、广播变量和检查点

累加器和广播变量不能从Spark Streaming的检查点中恢复。如果启用了检查点并使用累加器或广播变量,你必须为累加器和广播变量创建惰性实例,以便它们在驱动程序在失败后重新实例化。下面是一个示例。

def getWordExcludeList(sparkContext):
    if ("wordExcludeList" not in globals()):
        globals()["wordExcludeList"] = sparkContext.broadcast(["a", "b", "c"])
    return globals()["wordExcludeList"]

def getDroppedWordsCounter(sparkContext):
    if ("droppedWordsCounter" not in globals()):
        globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
    return globals()["droppedWordsCounter"]

def echo(time, rdd):
    # 获取或注册excludeList广播变量
    excludeList = getWordExcludeList(rdd.context)
    # 获取或注册droppedWordsCounter累加器
    droppedWordsCounter = getDroppedWordsCounter(rdd.context)

    # 使用excludeList删除单词,并使用droppedWordsCounter进行计数
    def filterFunc(wordCount):
        if wordCount[0] in excludeList.value:
            droppedWordsCounter.add(wordCount[1])
            False
        else:
            True

    counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())

wordCounts.foreachRDD(echo)

部署应用

本节讨论部署Spark Streaming应用程序的步骤。

要求

要运行Spark Streaming应用程序,您需要具备以下条件。

  1. 带有集群管理器的集群 - 这是任何Spark应用程序的通用要求,并在部署指南中进行了详细讨论。

  2. 打包应用程序JAR - 您必须将流式应用程序编译成JAR。如果您使用spark-submit启动应用程序,则不需要在JAR中提供Spark和Spark Streaming。但是,如果您的应用程序使用高级源(例如Kafka),则必须在用于部署应用程序的JAR中打包它们链接到的额外工件以及它们的依赖项。例如,使用KafkaUtils的应用程序将必须在应用程序JAR中包含spark-streaming-kafka-0-10_2.12及其所有传递依赖项。

  3. 为执行器配置足够的内存 - 由于接收到的数据必须存储在内存中,因此必须为执行器配置足够的内存来保存接收到的数据。请注意,如果您正在执行10分钟窗口操作,则系统必须至少在内存中保留过去10分钟的数据。因此,应用程序的内存需求取决于其中使用的操作。

  4. 配置检查点 - 如果流应用程序需要检查点,则必须配置Hadoop API兼容的容错存储(例如HDFS、S3等)中的目录作为检查点目录,并且流应用程序必须以一种可以使用检查点信息进行故障恢复的方式编写。有关更多详细信息,请参阅检查点部分。

  5. 配置应用程序驱动程序的自动重启 - 为了从驱动程序故障中自动恢复,运行流应用程序的部署基础架构必须监视驱动程序进程并在失败时重新启动驱动程序。不同的集群管理器具有实现此功能的不同工具。

    • Spark Standalone - Spark应用程序驱动程序可以提交到在Spark Standalone集群内运行的模式下(参见集群部署模式),也就是说,应用程序驱动程序本身在一个工作节点上运行。此外,Stand-alone集群管理器可以被指示监督驱动程序,并在驱动程序因非零退出代码或驱动程序所在节点的故障而失败时重新启动它。有关更多详细信息,请参阅Spark Standalone指南中的集群模式和supervise。
    • YARN - Yarn支持类似的机制来自动重启应用程序。请参阅YARN文档获取更多详细信息。
    • Mesos - 使用Marathon可以实现这一点。
  6. 配置预写日志 - 从Spark 1.2开始,我们引入了预写日志来实现强大的容错保证。如果启用了预写日志,则接收器接收到的所有数据都会写入配置检查点目录中的预写日志中。这样可以在驱动程序恢复时防止数据丢失,从而确保零数据丢失(在容错语义部分中详细讨论)。可以通过将配置参数spark.streaming.receiver.writeAheadLog.enable设置为true来启用此功能。然而,这种更强的语义可能会以接收器的接收吞吐量为代价。可以通过并行运行更多的接收器来纠正这个问题,以增加总吞吐量。此外,建议在启用预写日志时禁用Spark内部接收到的数据的复制,因为该日志已经存储在一个复制的存储系统中。这可以通过将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER来实现。当使用S3(或不支持刷新的任何文件系统)进行预写日志时,请记住启用spark.streaming.driver.writeAheadLog.closeFileAfterWrite和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。有关更多详细信息,请参阅Spark Streaming配置。请注意,启用I/O加密时,Spark不会对写入预写日志的数据进行加密。如果需要对预写日志数据进行加密,则应将其存储在支持本地加密的文件系统中。

  7. 设置最大接收速率 - 如果集群资源不足以使流应用程序能够处理接收到的数据的速度,可以通过设置每秒记录数的最大速率来限制接收器的速率。有关接收器的配置参数spark.streaming.receiver.maxRate和直接Kafka方法的配置参数spark.streaming.kafka.maxRatePerPartition,请参阅Spark 1.5文档。我们引入了一种名为反压(backpressure)的功能,该功能消除了设置此速率限制的需要,因为Spark Streaming会自动确定速率限制,并在处理条件发生变化时动态调整它们。可以通过将配置参数spark.streaming.backpressure.enabled设置为true来启用此反压功能。

升级应用程序代码

如果正在运行的Spark Streaming应用程序需要使用新的应用程序代码进行升级,则有两种可能的机制。

  1. 启动并并行运行升级后的Spark Streaming应用程序和现有应用程序。一旦新的应用程序(接收与旧应用程序相同的数据)已经准备好并准备投入使用,就可以关闭旧应用程序。请注意,这对于支持将数据发送到两个目标(即早期和升级的应用程序)的数据源是可行的。

  2. 优雅关闭现有应用程序(参见StreamingContext.stop(…)或JavaStreamingContext.stop(…)以获取优雅关闭选项),以确保接收到的数据在关闭之前完全处理。然后可以启动升级的应用程序,它将从先前应用程序停止的地方开始处理。请注意,这只能使用支持源端缓冲区(如Kafka)的输入源完成,因为在先前的应用程序关闭时需要缓冲数据,并且升级的应用程序尚未启动。无法从先前升级代码的检查点信息重新启动。检查点信息实际上包含序列化的Scala/Java/Python对象,尝试使用新的、修改过的类反序列化对象可能会导致错误。在这种情况下,要么使用不同的检查点目录启动升级应用程序,要么删除之前的检查点目录。

监控应用程序

除了Spark的监控功能外,还有一些特定于Spark Streaming的额外功能。当使用StreamingContext时,Spark Web界面会显示一个额外的Streaming选项卡,其中显示有关正在运行的接收器(接收器是否活动、接收到的记录数、接收器错误等)和已完成批次(批处理时间、排队延迟等)的统计信息。这可以用于监视流应用程序的进度。

以下两个Web界面中的指标尤为重要:

  1. 处理时间 - 处理每个数据批次所需的时间。

  2. 调度延迟 - 批处理在队列中等待前面批次处理完成的时间。

如果批处理时间始终大于批处理间隔和/或排队延迟不断增加,则表示系统无法按照生成的速度处理批处理,并且正在落后。在这种情况下,考虑减少批处理时间。

Spark Streaming程序的进度还可以使用StreamingListener接口进行监视,该接口允许获取接收器状态和处理时间。请注意,这是一个开发人员API,并且可能会在将来改进(即报告更多信息)。

性能调优

在集群上获得最佳性能的Spark Streaming应用程序需要进行一些调整。本节解释了可以调整的一些参数和配置,以提高应用程序的性能。在高层次上,您需要考虑两个方面:

  1. 通过有效使用集群资源来减少每个数据批次的处理时间。

  2. 设置合适的批处理大小,以便数据批次能够与其接收到的速度一样快地处理(即数据处理跟上数据摄取)。

减少批处理时间

有多种优化方式可以减少每个批处理的处理时间。这些已在调优指南中进行了详细讨论。本节重点介绍其中一些最重要的优化方式。

数据接收的并行级别

通过网络接收数据(例如Kafka、socket等)需要对数据进行反序列化并存储在Spark中。如果数据接收成为系统的瓶颈,则可以并行处理数据接收。请注意,每个输入DStream创建一个接收器(在工作机器上运行),接收一个数据流。通过创建多个输入DStream并将其配置为从源接收数据流的不同分区,可以实现接收多个数据流。例如,接收两个数据主题的单个Kafka输入DStream可以拆分为两个Kafka输入流,每个只接收一个主题。这将运行两个接收器,允许并行接收数据,从而增加了总吞吐量。这些多个DStreams可以合并在一起创建一个单一的DStream。然后,可以在统一的流上应用在单个输入DStream上应用的转换。以下是示例代码。

numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()

另一个需要考虑的参数是接收器的块间隔,该间隔由配置参数spark.streaming.blockInterval确定。对于大多数接收器,接收到的数据会被合并成数据块,然后存储在Spark的内存中。每个批次中的块数决定了将用于处理接收到的数据的任务数。每个接收器每批次的任务数将大约为(批处理间隔/块间隔)。例如,200毫秒的块间隔将创建每2秒批次中的10个任务。如果任务数过低(即小于每台机器的核心数),则效率会降低,因为所有可用的核心都无法用于处理数据。为了增加给定批次间隔的任务数,请减少块间隔。然而,建议块间隔的最小值约为50毫秒,低于该值会导致任务启动开销问题。

接收多个输入流/接收器的另一种方法是显式地对输入数据流进行重分区(使用inputStream.repartition())。这将在进一步处理之前将接收到的数据批次分布到群集中指定数量的机器上。

数据处理的并行级别

如果在计算的任何阶段使用的并行任务数不够高,则可能会浪费集群资源。例如,对于reduceByKey和reduceByKeyAndWindow等分布式缩减操作,默认的并行任务数由spark.default.parallelism配置属性控制。您可以将并行级别作为参数传递(参见PairDStreamFunctions文档),或者设置spark.default.parallelism配置属性来更改默认值。

数据序列化

通过调整序列化格式,可以减少数据序列化的开销。在流处理中,有两种类型的数据被序列化。

输入数据:默认情况下,通过接收器接收的输入数据会使用StorageLevel.MEMORY_AND_DISK_SER_2存储在执行器的内存中。也就是说,数据会被序列化为字节以减少GC开销,并进行复制以容忍执行器故障。此外,数据首先存储在内存中,仅当内存不足以容纳流处理计算所需的所有输入数据时,才会溢出到磁盘上。这种序列化显然有开销 - 接收器必须将接收到的数据进行反序列化,并使用Spark的序列化格式重新序列化。

由流操作生成的持久化RDD:由流计算生成的RDD可以在内存中持久化。例如,窗口操作将数据持久化在内存中,因为它们将被多次处理。但是,与Spark核心的默认StorageLevel.MEMORY_ONLY不同,流计算生成的持久化RDD默认情况下以StorageLevel.MEMORY_ONLY_SER(即序列化)进行持久化,以最小化GC开销。

在这两种情况下,使用Kryo序列化可以减少CPU和内存开销。有关更多详细信息,请参阅Spark调优指南。对于Kryo,请考虑注册自定义类并禁用对象引用跟踪(请参阅配置指南中的与Kryo相关的配置)。

在特定情况下,如果需要保留的流应用程序的数据量不大,则可以在不产生过多GC开销的情况下将数据(两种类型)作为反序列化对象进行持久化可能是可行的。例如,如果使用几秒钟的批处理间隔和没有窗口操作,则可以尝试通过显式设置适当的存储级别来禁用持久化数据的序列化。这将减少由于序列化而产生的CPU开销,从而可能提高性能而不会产生太多的GC开销。

任务启动开销

如果每秒启动的任务数很高(例如,每秒50个或更多个任务),则发送任务到执行器的开销可能很大,并且很难实现亚秒级的延迟。可以通过以下更改来减少开销:

  1. 执行模式:与精细粒度Mesos模式相比,以独立模式或粗粒度Mesos模式运行Spark可以获得更好的任务启动时间。请参阅在Mesos上运行的指南获取更多详细信息。

这些更改可能会将批处理时间减少100毫秒左右,从而使亚秒级批处理大小成为可行。

设置合适的批处理间隔

对于在集群上运行的Spark Streaming应用程序来说,要保持稳定,系统应该能够按照生成的速度处理数据。换句话说,数据批次应该与生成的速度一样快地处理。可以通过监视流Web界面中的处理时间来验证是否对于一个应用程序是真实的。在那里,批处理时间应小于批处理间隔。

根据流计算的性质,使用的批处理间隔可能对在固定集群资源上可以持续支持的数据速率产生重大影响。例如,让我们考虑之前的WordCountNetwork示例。对于特定的数据速率,系统可以每2秒报告一次单词计数(即批处理间隔为2秒),但无法每500毫秒报告一次。因此,批处理间隔需要设置为可以持续生产的预期数据速率。

找到适合您的应用程序的正确批处理大小的好方法是使用保守的批处理间隔(例如,5-10秒)和低数据速率进行测试。要验证系统能否跟上数据速率,请检查每个处理批次的端到端延迟值(可以在Spark驱动程序的log4j日志中查找“Total delay”或使用StreamingListener接口)。如果延迟保持在与批处理大小相当的水平,则系统是稳定的。否则,如果延迟不断增加,则表示系统无法跟上并且不稳定。一旦了解了稳定的配置,可以尝试增加数据速率和/或减小批处理大小。请注意,由于临时数据速率增加可能会导致短暂的延迟增加,只要延迟降低到较低值(即小于批处理大小),那么这种情况可能是可以接受的。

内存调优

调整Spark应用程序的内存使用和GC行为已在调优指南中详细讨论。强烈建议您阅读该指南。在本节中,我们将在Spark Streaming应用程序的上下文中讨论一些特定于内存的调优参数。

一个Spark Streaming应用程序所需的集群内存量主要取决于所使用的转换类型。例如,如果要在最近10分钟的数据上使用窗口操作,则集群应具有足够的内存来在内存中保存10分钟的数据。或者,如果要使用大量键进行updateStateByKey,则所需的内存将很高。相反,如果要执行简单的映射-过滤-存储操作,则所需的内存将较低。

总体而言,由于通过接收器接收的数据存储为StorageLevel.MEMORY_AND_DISK_SER_2,默认情况下,不适合内存的数据将溢出到磁盘。这可能会降低流应用程序的性能,因此建议根据流应用程序的要求提供足够的内存。最好尝试并估算小规模上的内存使用情况。

内存调优的另一个方面是垃圾回收。对于需要低延迟的流应用程序,由于JVM垃圾回收引起的大量暂停时间是不可取的。

有一些参数可以帮助您调整内存使用和GC开销:

DStreams的持久化级别:如前所述,在数据序列化部分中,通过接收器接收的输入数据和RDD默认情况下都会以序列化字节的形式进行持久化。与反序列化的持久化相比,这减少了内存使用和GC开销。使用Kryo序列化可以进一步减少序列化大小和内存使用。可以通过压缩(请参阅Spark配置spark.rdd.compress)来进一步减少内存使用,但这会增加CPU时间。

清除旧数据:默认情况下,所有通过DStream转换生成的输入数据和持久化的RDD都会自动清除。根据使用的转换方式,Spark Streaming决定何时清除数据。例如,如果使用了10分钟的窗口操作,则Spark Streaming将保留过去的10分钟数据,并主动丢弃旧数据。通过设置streamingContext.remember可以保留更长时间的数据(例如,交互式查询旧数据)。

CMS垃圾回收器:强烈建议使用并发标记和清除(CMS)GC来保持GC相关暂停时间始终较低。即使并发GC已知会降低系统的整体处理吞吐量,但仍建议使用它以获得更一致的批处理时间。确保在驱动程序上设置CMS GC(使用spark-submit中的–driver-java-options)和执行器(使用Spark配置spark.executor.extraJavaOptions)。

其他提示:为了进一步减少GC开销,请尝试以下一些提示。

使用OFF_HEAP存储级别持久化RDD。请参阅Spark编程指南中的更多详细信息。

使用更多具有较小堆大小的执行器。这将减少每个JVM堆内部的GC压力。

重要要点:

  1. 一个DStream与一个接收器相关联。要实现读取并行性,需要创建多个接收器,即多个DStreams。接收器在执行器中运行。它占用一个核心。在预订接收器插槽(即spark.cores.max应考虑到接收器插槽)后,请确保在处理后还有足够的核心可用于处理。

  2. 当从流源接收数据时,接收器创建数据块。每隔blockInterval毫秒生成一个新的数据块。在batchInterval期间生成N个数据块,其中N = batchInterval/blockInterval。这些块由当前执行器的BlockManager分发到其他执行器的块管理器。然后,运行在驱动程序上的Network Input Tracker会根据块的位置进行通知,以进行进一步处理。

  3. 对于batchInterval期间生成的块,驱动程序上的RDD被创建。在batchInterval期间生成的块是RDD的分区。每个分区是Spark中的一个任务。生成的批次的块在RDD中,这些块是RDD的分区。RDD的处理由驱动程序的作业调度器作为作业计划来安排。在给定时间点上只有一个作业处于活动状态。因此,如果一个作业正在执行,其他作业将排队等待。

  4. 如果有两个DStreams,则会形成两个RDD,并且将创建两个作业,这些作业将依次调度。为了避免这种情况,可以合并两个DStreams。这将确保两个DStreams的RDDs形成单个unionRDD。然后,以前在单个输入DStream上应用的转换可以在统一的流上应用。示例如下:

numStreams = 2
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
  1. 如果批处理时间大于batchinterval,则可靠接收器的内存将开始填满,并且最终导致抛出异常(最可能是BlockNotFoundException)。目前,没有办法暂停接收器。可以通过设置spark.streaming.receiver.maxRate配置参数限制接收器的速率来解决此问题。

容错语义

在本节中,我们将讨论Spark Streaming应用程序在发生故障时的行为。

背景

为了理解Spark Streaming提供的语义,让我们记住Spark的RDD的基本容错语义。

  1. RDD是一个不可变的、确定性重新计算的分布式数据集。每个RDD都记住了在容错输入数据集上使用的确定性操作的血统。
  2. 如果由于工作节点故障而丢失了RDD的任何分区,则可以使用操作的血统从原始的容错数据集中重新计算该分区。
  3. 假设所有的RDD转换都是确定性的,那么最终转换后的RDD的数据将始终是相同的,无论发生什么故障。

Spark在像HDFS或S3这样的容错文件系统上操作数据。因此,所有从容错数据生成的RDD也是容错的。但是,对于Spark Streaming来说情况并非如此,因为大多数情况下数据是通过网络接收的(除非使用fileStream)。为了为所有生成的RDD提供相同的容错特性,接收到的数据会在群集中的多个Spark执行器之间复制(默认副本因子为2)。这导致系统中存在两种需要在故障事件中恢复的数据:

  1. 接收到的数据和复制的数据 - 当单个工作节点发生故障时,由于副本存在于其他节点上,所以不会丢失该分区的数据。
  2. 接收到的数据但尚未进行复制的缓冲数据 - 由于此数据未复制,因此恢复此数据的唯一方法是从源重新获取。

此外,我们应该关注两种类型的故障:

  1. 工作节点故障 - 运行执行器的任何工作节点都可能失败,并且所有内存中的数据都将丢失。如果有接收器正在运行失败的节点上,则它们的缓冲数据将丢失。
  2. 驱动程序节点故障 - 如果运行Spark Streaming应用程序的驱动程序节点失败,那么显然将丢失SparkContext和所有带有其内存中数据的执行器。

了解了这些基本知识,让我们了解Spark Streaming的容错语义。

定义

流处理系统的语义通常是根据系统对每条记录可以处理多少次来描述的。在所有可能的操作条件下(尽管存在故障等情况),系统可以提供以下三种类型的保证:

  1. 最多一次:每条记录将被处理一次或根本不处理。

  2. 至少一次:每条记录将被处理一次或多次。这比最多一次更强,因为它确保不会丢失任何数据。但可能会出现重复的情况。

  3. 正好一次:每条记录将被处理一次 - 不会丢失任何数据,也不会多次处理。这显然是三种保证中最强的保证。

基本语义

在任何流处理系统中,广义上来说,处理数据有三个步骤。

  1. 接收数据:使用接收器或其他方式从源接收数据。

  2. 转换数据:使用DStream和RDD转换来转换接收到的数据。

  3. 推送数据:将最终转换后的数据推送到外部系统,如文件系统、数据库、仪表板等。

如果一个流处理应用程序必须实现端到端的正好一次保证,那么每个步骤都必须提供正好一次的保证。也就是说,每条记录必须准确地接收一次、准确地转换一次,并且准确地推送到下游系统一次。让我们在Spark Streaming的上下文中理解这些步骤的语义。

  1. 接收数据:不同的输入源提供不同的保证。这在下一小节中进行了详细讨论。

  2. 转换数据:由于RDD提供的保证,所有已接收的数据将精确地处理一次。即使发生故障,只要接收到的输入数据可访问,最终转换后的RDD始终具有相同的内容。

  3. 推送数据:输出操作默认提供至少一次的语义,这取决于输出操作类型(幂等还是非幂等)和下游系统的语义(是否支持事务)。但是用户可以实现自己的事务机制以实现正好一次的语义。这在本节后面详细讨论。

接收到的数据的语义

不同的输入源提供不同的保证,从至少一次到仅一次。阅读更多细节。

使用文件

如果所有的输入数据已经存在于像HDFS这样的容错文件系统中,Spark Streaming可以始终从任何故障中恢复并处理所有数据。这提供了仅一次的语义,意味着无论发生什么故障,所有的数据都将被处理一次。

使用基于接收器的数据源

对于基于接收器的输入源,容错语义取决于故障场景和接收器的类型。有两种类型的接收器:

  • 可靠接收器:这些接收器只在确保接收到的数据已经被复制后才确认可靠的源。如果这样的接收器失败,源将不会收到未复制的数据的确认。因此,如果接收器重新启动,源将重新发送数据,没有数据会因为故障而丢失。
  • 不可靠接收器:这些接收器不发送确认,因此当它们由于工作节点或驱动程序的故障而失败时,可能会丢失数据。

根据所使用的接收器类型,我们实现以下语义:

  • 如果工作节点失败,则可靠接收器不会丢失数据。对于不可靠接收器,接收到但未复制的数据可能会丢失。
  • 如果驱动程序节点失败,则除了上述情况,所有在内存中接收并复制的过去数据将丢失。这将影响有状态的转换的结果。

为了避免丢失过去接收的数据,Spark 1.2引入了预写式日志(write ahead logs),它将接收到的数据保存到容错存储中。启用预写式日志和可靠接收器,将不会丢失任何数据。在语义上,它提供了至少一次的保证

下表总结了在故障情况下的语义:

部署场景 工作节点故障 驱动程序故障
Spark 1.1或更早版本, 或者
Spark 1.2或之后没有启用预写式日志
不可靠接收器丢失缓冲数据 可靠接收器无数据丢失
Spark 1.2或之后启用预写式日志 可靠接收器无数据丢失 可靠接收器和文件无数据丢失
使用Kafka Direct API

在Spark 1.3中,我们引入了新的Kafka Direct API,可以确保所有Kafka数据被Spark Streaming准确接收一次。除此之外,如果实现了确切一次输出操作,还可以实现端到端的确切一次保证。这种方法在Kafka集成指南中进一步讨论。

输出操作的语义

输出操作(如foreachRDD)具有至少一次的语义,也就是说,在工作节点故障的情况下,转换后的数据可能会多次写入外部实体。对于保存到文件系统的操作,这是可接受的(因为文件将简单地用相同的数据进行覆盖),但为了实现确切一次的语义可能需要额外的努力。有两种方法:

  • 幂等更新:多次尝试始终写入相同的数据。例如,saveAs***Files总是将相同的数据写入生成的文件。
  • 事务性更新:所有的更新都以事务方式进行,使得更新只会原子地进行一次。可以通过以下方式来实现:
    • 使用批处理时间和RDD的分区索引创建一个标识符,唯一地标识流式应用程序中的一个块数据。
    • 使用该标识符以事务方式更新外部系统,即如果标识符尚未提交,则原子地提交分区数据和标识符;否则,跳过更新。
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // 使用这个uniqueId以事务方式提交partitionIterator中的数据
  }
}

常看spark文档

kafka streaming流数据处理遇到的问题实战总结系列

Spark Dataset DataFrame原理方法示例源码分析

Spark SparkSession由来方法示例源码分析

Spark RDD由来原理方法示例源码详解

Spark SparkContext原理用法示例源码详解

Spark SQL示例用法所有函数权威详解

SparkSQL性能调优官网权威资料

spark dataset/DataFrame比RDD好在哪些地方?

Spark YARN Cluster和Client两种不同提交模式区别

spark RDD 概述用法官网权威资料(建议收藏)

Spark on k8s如何在Kubernetes运行官方权威资料

Spark性能权威调优指南

Spark提交任务官网权威指南详解(建议收藏)

Spark数据类型官网权威详解

Spark 集群模式概述

在YARN上启动Spark任务原理用法官方权威资料

Spark SparkContext原理用法示例源码详解

Spark SQL示例用法所有函数权威详解

SparkSQL性能调优官网权威资料

spark dataset/DataFrame比RDD好在哪些地方?

Spark YARN Cluster和Client两种不同提交模式区别

spark RDD 概述用法官网权威资料(建议收藏)

Spark on k8s如何在Kubernetes运行官方权威资料

Spark性能权威调优指南

Spark提交任务官网权威指南详解(建议收藏)

Spark数据类型官网权威详解

Spark 集群模式概述

在YARN上启动Spark任务原理用法官方权威资料

spark SQL Implicits原理用法示例源码分析文章来源地址https://www.toymoban.com/news/detail-759870.html

到了这里,关于Spark Streaming 编程权威使用指南的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Apache Spark处理Excel文件的简易指南

    在日常的工作中,表格内的工具是非常方便的x,但是当表格变得非常多的时候,就需要一些特定的处理。Excel作为功能强大的数据处理软件,广泛应用于各行各业,从企业管理到数据分析,可谓无处不在。然而,面对大型且复杂的数据,Excel的处理能力可能力不从心。 对此,

    2024年01月19日
    浏览(46)
  • 大数据编程实验四:Spark Streaming

    一、目的与要求 1、通过实验掌握Spark Streaming的基本编程方法; 2、熟悉利用Spark Streaming处理来自不同数据源的数据。 3、熟悉DStream的各种转换操作。 4、熟悉把DStream的数据输出保存到文本文件或MySQL数据库中。 二、实验内容 1.参照教材示例,利用Spark Streaming对三种类型的基

    2024年02月03日
    浏览(53)
  • 实验四 Spark Streaming编程初级实践

    数据流  :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。 1.下载安装包 https://www.apache.org/dyn/closer.lua/flume/

    2024年04月26日
    浏览(47)
  • 【Spark编程基础】第7章 Structured Streaming

    7.1.1 基本概念 Structured Streaming的关键思想是将实时数据流视为一张正在不断添加数据的表 可以把流计算等同于在一个静态表上的批处理查询,Spark会在不断添加数据的无界输入表上运行计算,并进行增量查询 在无界表上对输入的查询将生成结果表,系统每隔一定的周期会触发

    2024年02月08日
    浏览(57)
  • Spark Structured Streaming使用教程

    Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎,Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。 Structured Streaming把持续不断的流式数据当做一个不断追加的表,这使得新的流处理模型与批处理模型非常相似。您将把流计算表

    2024年02月03日
    浏览(56)
  • spark介绍之spark streaming

    Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、w

    2024年02月02日
    浏览(34)
  • 在Spring Boot中使用Spark Streaming进行实时数据处理和流式计算

    引言: 在当今大数据时代,实时数据处理和流式计算变得越来越重要。Apache Spark作为一个强大的大数据处理框架,提供了Spark Streaming模块,使得实时数据处理变得更加简单和高效。本文将深入浅出地介绍如何在Spring Boot中使用Spark Streaming进行实时数据处理和流式计算,并提供

    2024年03月27日
    浏览(48)
  • Spark的生态系统概览:Spark SQL、Spark Streaming

    Apache Spark是一个强大的分布式计算框架,用于大规模数据处理。Spark的生态系统包括多个组件,其中两个重要的组件是Spark SQL和Spark Streaming。本文将深入探讨这两个组件,了解它们的功能、用途以及如何在Spark生态系统中使用它们。 Spark SQL是Spark生态系统中的一个核心组件,它

    2024年02月01日
    浏览(39)
  • Spark面试整理-解释Spark Streaming是什么

    Spark Streaming是Apache Spark的一个组件,它用于构建可扩展、高吞吐量、容错的实时数据流处理应用。Spark Streaming使得可以使用Spark的简单编程模型来处理实时数据。以下是Spark Streaming的一些主要特点: 1. 微批处理架构 微批处理: Spark Streaming的核心是微批处理模型。它将实

    2024年04月13日
    浏览(47)
  • [Spark、hadoop]Spark Streaming整合kafka实战

    目录 一.KafkaUtils.createDstream方式 二.KafkaUtils.createDirectStream方式  温习 Kafka是由Apache软件基金会开发的一个开源流处理平台,它使用Scala和Java语言编写,是一个基于Zookeeper系统的分布式发布订阅消息系统,该项目的设计初衷是为实时数据提供一个统一、高通量、低等待的消息

    2024年01月21日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包