带你从Spark官网啃透Spark Structured Streaming

这篇具有很好参考价值的文章主要介绍了带你从Spark官网啃透Spark Structured Streaming。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

By 远方时光原创,可转载,open

合作微信公众号:大数据左右手

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

本文是基于spark官网结构化流解读

Structured Streaming Programming Guide - Spark 3.5.1 Documentation (apache.org)

spark官网对结构化流解释

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

我浓缩了一些关键信息:

1.结构化流是基于SparkSQL引擎构建的可扩展且容错的流处理引擎。(也就是他摒弃了DStream)

2.可以像批数据一样处理流数据。可以使用Dataset/DataFrame API在Scala、Java、Python或R中流聚合、事件时窗口、流批数据join等操作。(sparksql处理的是静态的有界表,sparkstreaming 处理的是动态无界表)

3.通过检查点预写日志确保端到端精确一次容错保证。(一条数据只被消费一次)

4.默认结构化流查询使用微批次处理作业引擎进行处理,并实现低至100毫秒的端到端延迟和精确一次的容错保证。

5.自Spark 2.3,引入了一种新的更低延迟处理模式,称为连续处理,它可以实现低至1毫秒的端到端延迟,并保证至少一次。(这个延迟基本和flink处理流无区别了)

基本概念:

输入表

可以抽象的认为:消费的流数据,源源不断的追加到一张无界表中。

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

输出表

处理后的结果,比如下图中groupby($"word").count()

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

输出模式:

·完成模式(complete)整个更新的结果表将被写入外部存储。全部输出,必须要有聚合

time1:

输入表:''cat dog dog dog''

-> groupby($"word").count()

-> 结果表输出:cat 1, dog 3

time2:

新增消息 "owl cat"

-> groupby($"word").count()

-> 结果表输出:cat 2, dog 3, owl 1

time3:

新增消息 "dog owl"

-> groupby($"word").count()

-> 结果表输出:cat 2, dog 4, owl 2

·追加模式(apend):自上次触发器以来,追加到结果表中的新增的行才会写入外部存储。仅适用于结果表中现有行预计不会更改。

time1:

输入表:''cat dog'' -> 不处理 -> 结果表输出:cat, dog

time2:

新增消息 ''fish'' -> 不处理 -> 结果表输出:fish

·更新模式(update):自上次触发器以来,在结果表中更新的行才会写入外部存储(自Spark2.1.1起可用)。如果查询不包含聚合,则相当于追加模式。

time1:

输入表:''cat dog dog dog''

-> groupby($"word").count()

-> 结果表输出:cat 1, dog 3

time2:

新增消息 "owl cat"

-> groupby($"word").count()

-> 结果表输出:cat 2, owl 1 (变化和新增输出,dog 3对比time1无变化不输出)

处理事件时间

{''id'':''8888888'', ''time'':''2024-03-04 19:36:30'',''data'':''****''}

事件时间是嵌入在数据本身中的时间spark允许基于eventTime窗口聚合

时间窗口:

滚动窗口:窗口无重合,window($"timestamp", "5 minutes", "5 minutes")

滑动窗口:窗口有重合,window($"timestamp", "10 minutes", "5 minutes")

会话窗口:设有一个时间间隔(5分钟),结合下图看,12:09分后面5分钟,都没收到新数据,所以在12:14分窗口关闭

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

水位线解决延迟数据  (超级重点,面试爱问)

从 Spark 2.1 开始,支持水印或者叫水位线(watermark),一种窗口关闭延迟机制,用于解决部分乱序数据

官网写的太长,我简化一下,你对着图看:

注:④抽象为一条数据(其事件时间为12:04的)

水位线 = 曾经消费过最迟事件时间(max eventTime) - 允许延迟的时间(threshold)

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

1)消费到④,拉倒0~5s窗口,watermark=4 - 3 = 1

2)消费到⑥,拉到5~10s窗口,watermark=6 - 3 = 3

3)消费到⑤,拉到5~10s窗口,⑤正常是会⑥之前被消费到,此时出现乱序,⑤它晚到了

如果没有设置水位线,消费到⑥的时候0~5s窗口就应该被关闭,⑤丢失

但是我们设置了3s水位线延迟机制,

此时水位线watermark = 6 - 3 = 3 (曾经消费过最迟eventTime是⑥ - 3,而不是⑤ - 3),抽象理解为水位线只会上涨,不会下降

因为水位线机制,晚到的⑤仍然可以进入到0~5s窗口

只有当水位线>=5,这里5指的是开窗的(0,5]右区间,0~5s窗口才会关闭

4)消费到⑧,拉倒5~10s窗口,watermark=8 - 3 = 5,那么0~5s窗口此时正式关闭

5)消费到③,0~5s窗口已经关闭,这条数据晚太多了,被丢失掉了,所以尽管设置水位线还是会有数据丢失。

水位线用来鉴别延迟数据的有效性:在水位线以内的数据都是有效数据参与窗口的计算,水位线以外的数据则为过期数据丢弃 

检查点机制保证端到端精确一次消费(checkpoint) <--重点

在 Spark Structured Streaming 中,通过使用 checkpoint 可以确保端到端的精确一次语义和容错。

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

val spark = SparkSession.builder.appName("YourAppName").getOrCreate()

// 设置检查点目录
spark.conf.set("spark.checkpoint.directory", "/path/to/checkpoint")

// 创建 Event Hubs 配置
val eventHubsConf = Map(
  "eventhubs.connectionString" -> "your_eventhubs_connection_string",
  "eventhubs.consumerGroup" -> "your_consumer_group",
  "eventhubs.name" -> "your_eventhubs_name"
)

val streamingQuery = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf)
  .option("startingOffsets", "latest")  // or "earliest" based on your requirements
  .load()
  // Perform transformations, aggregations, etc.
  .writeStream
  .outputMode("update")  
  .format("console")    
  .option("checkpointLocation", "/path/to/checkpoint") // <---checkpoint 记录消费的offset
  .start()

streamingQuery.awaitTermination()

在 `options` 中,可以使用 `startingOffsets` 参数来指定从哪个 offset 开始读取数据。你可以将其设置为 `"latest"` 或 `"earliest"`,具体取决于你的需求。

确保检查点目录是在可靠的文件系统上。这样,在应用程序重新启动时,Spark Streaming 将能够恢复到上次处理的状态,从上一次记录的 offset 开始读取数据,实现端到端的精确一次语义和容错

真实checkpoint储到路径是什么样的

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

预写日志WAL(write ahead log)机制来保证精确一次消费(重点)

3.通过检查点预写日志WAL确保端到端精确一次容错保证。(一条数据只被消费一次)

思考一个问题我只需要checkpoint记录我的offset就可以保证精确一次消费吗?

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

至多一次消费:

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

至少一次消费:

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

精确一次消费:

两种方式:

1.使用后提交offset保证至少一次(数据重复)+幂等性,也就是写入目的地支持去重,来去处重复数据

2.另外一种就是保证提交offset输出数据同时成功,或者同时失败,也就是事务。预写日志机制(WAL)就是这么做的。

带你从Spark官网啃透Spark Structured Streaming,spark,大数据,分布式

WAL(Write Ahead Log)预写日志,是数据库系统中常见的一种手段,用于保证数据操作的原子性和持久性

「预写式日志」(Write-ahead logging,缩写 WAL)是关系数据库系统中用于提供原子性和持久性(ACID 属性中的两个)的一系列技术。在使用 WAL 的系统中,所有的修改在提交之前都要先写入 log 文件中。

log 文件中通常包括 redo 和 undo 信息(redo重做,undo撤销还原)。这样做的目的可以通过一个例子来说明。假设一个程序在执行某些操作的过程中机器掉电了。在重新启动时,程序可能需要知道当时执行的操作是成功了还是部分成功或者是失败了。如果使用了 WAL,程序就可以检查 log 文件,并对突然掉电时计划执行的操作内容跟实际上执行的操作内容进行比较。在这个比较的基础上,程序就可以决定是撤销已做的操作还是继续完成已做的操作,或者是保持原样。

我抽象理解:有一个地方记录了log,用于记录这一次的offset和写出数据是否都成功,如果两个中有一次没成功,成功的那一个回滚(撤销已做的操作),在程序重启的时候先检查一下WAL文件,看看我是否需要回滚,来保证原子性。

官网原文:

The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.

Checkpiont+WAL记录offset,回滚

还有一个流接收器来让数据幂等性(没找的相关介绍和科普),我问了一下chatGPT:

预写式日志(WAL):

  • WAL提供持久性和原子性,并确保即使在处理数据但在元数据检查点之前发生故障时,系统也可以通过回滚到已记录的信息进行恢复。

幂等性的接收端:

  • Structured Streaming中的流接收端被设计为幂等(key去重)。
  • 幂等的接收端确保如果系统由于故障而需要重新处理数据,不会导致重复或不正确的输出。
  • 通过使接收端具有幂等性,系统可以安全地回滚数据而不会引入不一致性。

2024-3-7 0:18 写不动了,明天看看有什么继续写的 

觉得不错的点赞收藏一下文章来源地址https://www.toymoban.com/news/detail-847764.html

到了这里,关于带你从Spark官网啃透Spark Structured Streaming的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 结构化流(Structured Streaming)

    有界数据: 无界数据: 结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL … Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针

    2024年01月17日
    浏览(41)
  • Structured_Streaming和Kafka整合

    默认情况下,Spark的结构化流支持多种输出方案: File Sink foreach sink 允许对输出的数据进行任意的处理操作,具体如何处理由用户自定义函数决定。对输出的数据一个个进行处理操作。 使用方式主要有二种 方式一: 方式二:这种方式的适用场景是需要和资源打交道的情况(

    2024年01月19日
    浏览(46)
  • pyspark之Structured Streaming file文件案例1

    # generate_file.py  # 生成数据 生成500个文件,每个文件1000条数据 # 生成数据格式:eventtime name province action ()时间 用户名 省份 动作) import os  import time import shutil import time FIRST_NAME = [\\\'Zhao\\\', \\\'Qian\\\', \\\'Sun\\\', \\\'Li\\\', \\\'Zhou\\\', \\\'Wu\\\', \\\'Zheng\\\', \\\'Wang\\\'] SECOND_NAME = [\\\'San\\\', \\\'Si\\\', \\\'Wu\\\', \\\'Chen\\\', \\\'Yang\\\', \\\'Min\\\', \\\'Jie\\\', \\\'Qi

    2024年01月21日
    浏览(28)
  • 手把手带你啃透比特币白皮书-摘要

    很多人虽然了解了区块链,也可能参与了一些项目,但是可能没有见过比特币白皮书,也没有读过。我接下来就要和大家聊一聊,什么是白皮书,尤其是来给大家精读一下比特币的白皮书。 通过比特币白皮书,你能够 了解到真正的白皮书应该是什么样形式的 。因为很多人可

    2024年02月02日
    浏览(38)
  • spark介绍之spark streaming

    Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、w

    2024年02月02日
    浏览(27)
  • Spark(39):Streaming DataFrame 和 Streaming DataSet 输出

    目录 0. 相关文章链接 1. 输出的选项 2. 输出模式(output mode) 2.1. Append 模式(默认) 2.2. Complete 模式 2.3. Update 模式 2.4. 输出模式总结 3. 输出接收器(output sink) 3.1. file sink 3.2. kafka sink 3.2.1. 以 Streaming 方式输出数据 3.2.2. 以 batch 方式输出数据 3.3. console sink 3.4. memory sink 3.5. fo

    2024年02月13日
    浏览(30)
  • Spark编程实验四:Spark Streaming编程

    目录 一、目的与要求 二、实验内容 三、实验步骤 1、利用Spark Streaming对三种类型的基本数据源的数据进行处理 2、利用Spark Streaming对Kafka高级数据源的数据进行处理 3、完成DStream的两种有状态转换操作 4、把DStream的数据输出保存到文本文件或MySQL数据库中 四、结果分析与实验

    2024年02月03日
    浏览(29)
  • Spark面试整理-解释Spark Streaming是什么

    Spark Streaming是Apache Spark的一个组件,它用于构建可扩展、高吞吐量、容错的实时数据流处理应用。Spark Streaming使得可以使用Spark的简单编程模型来处理实时数据。以下是Spark Streaming的一些主要特点: 1. 微批处理架构 微批处理: Spark Streaming的核心是微批处理模型。它将实

    2024年04月13日
    浏览(34)
  • 什么是Dapp?带你从零开始搭建一个Dapp

    前言:Dapp就是去中心化应用,它和我们平时使用的App(微信,支付宝等)只差了一个去中心化,如何理解这一去中心化?从体验层面来说:Dapp中并没有管理者,大家都是平等的,互相监督;而从技术层面来说:传统的App和部署在服务器的后端产生交互,而Dapp则是和部署在区

    2024年02月05日
    浏览(34)
  • Spark的生态系统概览:Spark SQL、Spark Streaming

    Apache Spark是一个强大的分布式计算框架,用于大规模数据处理。Spark的生态系统包括多个组件,其中两个重要的组件是Spark SQL和Spark Streaming。本文将深入探讨这两个组件,了解它们的功能、用途以及如何在Spark生态系统中使用它们。 Spark SQL是Spark生态系统中的一个核心组件,它

    2024年02月01日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包