【Spark编程基础】第7章 Structured Streaming

这篇具有很好参考价值的文章主要介绍了【Spark编程基础】第7章 Structured Streaming。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录



前言


第7章 Structured Streaming

7.1 概述

7.1.1 基本概念

  • Structured Streaming的关键思想是将实时数据流视为一张正在不断添加数据的表
  • 可以把流计算等同于在一个静态表上的批处理查询,Spark会在不断添加数据的无界输入表上运行计算,并进行增量查询
    【Spark编程基础】第7章 Structured Streaming
  • 在无界表上对输入的查询将生成结果表,系统每隔一定的周期会触发对无界表的计算并更新结果表
    【Spark编程基础】第7章 Structured Streaming

7.1.2 两种处理模型

(1)微批处理

  • Structured Streaming默认使用微批处理执行模型,这意味着Spark流计算引擎会定期检查流数据源,并对自上一批次结束后到达的新数据执行批量查询
  • 数据到达和得到处理并输出结果之间的延时超过100毫秒
    【Spark编程基础】第7章 Structured Streaming

(2)持续处理

  • Spark从2.3.0版本开始引入了持续处理的试验性功能,可以实现流计算的毫秒级延迟
  • 在持续处理模式下,Spark不再根据触发器来周期性启动任务,而是启动一系列的连续读取、处理和写入结果的长时间运行的任务
    【Spark编程基础】第7章 Structured Streaming

7.1.3 Structured Streaming 和 Spark SQL、Spark Streaming 关系

  • Structured Streaming处理的数据跟Spark Streaming一样,也是源源不断的数据流,区别在于,Spark Streaming采用的数据抽象是DStream(本质上就是一系列RDD),而Structured Streaming采用的数据抽象是DataFrame。
  • Structured Streaming可以使用Spark SQL的DataFrame/Dataset来处理数据流。虽然Spark SQL也是采用DataFrame作为数据抽象,但是,Spark SQL只能处理静态的数据,而Structured Streaming可以处理结构化的数据流。这样,Structured Streaming就将Spark SQL和Spark Streaming二者的特性结合了起来。
  • Structured Streaming可以对DataFrame/Dataset应用前面章节提到的各种操作,包括select、where、groupBy、map、filter、flatMap等。
  • Spark Streaming只能实现秒级的实时响应,而Structured Streaming由于采用了全新的设计方式,采用微批处理模型时可以实现100毫秒级别的实时响应,采用持续处理模型时可以支持毫秒级的实时响应。

7.2 编写Structured Streaming程序的基本步骤

  • 编写Structured Streaming程序的基本步骤包括:
    • 导入pyspark模块
    • 创建SparkSession对象
    • 创建输入数据源
    • 定义流计算过程
    • 启动流计算并输出结果
  • 实例任务:
    • 一个包含很多行英文语句的数据流源源不断到达,
    • Structured Streaming程序对每行英文语句进行拆分,并统计每个单词出现的频率

1.步骤1:导入pyspark模块

  • 导入PySpark模块,代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode

由于程序中需要用到拆分字符串和展开数组内的所有单词的功能,所以引用了来自pyspark.sql.functions里面的split和explode函数。

2.步骤2:创建SparkSession对象

  • 创建一个SparkSession对象,代码如下:
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()
 
    spark.sparkContext.setLogLevel('WARN')

3.步骤3:创建输入数据源

  • 创建一个输入数据源,从“监听在本机(localhost)的9999端口上的服务”那里接收文本数据,具体语句如下:
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

4.步骤4:定义流计算过程

  • 有了输入数据源以后,接着需要定义相关的查询语句,具体如下:
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)
wordCounts = words.groupBy("word").count()

5.步骤5:启动流计算并输出结果

  • 定义完查询语句后,下面就可以开始真正执行流计算,具体语句如下:
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="8 seconds") \
    .start()
 
query.awaitTermination()

【Spark编程基础】第7章 Structured Streaming
【Spark编程基础】第7章 Structured Streaming

7.3 输入源

  • File源(或称为“文件源”)以文件流的形式读取某个目录中的文件,支持的文件格式为csv、json、orc、parquet、text等。
  • 需要注意的是,文件放置到给定目录的操作应当是原子性的,即不能长时间在给定目录内打开文件写入内容,而是应当采取大部分操作系统都支持的、通过写入到临时文件后移动文件到给定目录的方式来完成。

7.3.1 File源

  • File源(或称为“文件源”)以文件流的形式读取某个目录中的文件,支持的文件格式为csv、json、orc、parquet、text等。
  • 需要注意的是,文件放置到给定目录的操作应当是原子性的,即不能长时间在给定目录内打开文件写入内容,而是应当采取大部分操作系统都支持的、通过写入到临时文件后移动文件到给定目录的方式来完成。
  • 一个实例:
    • 这里以一个JSON格式文件的处理来演示File源的使用方法,主要包括以下两个步骤:
      • 创建程序生成JSON格式的File源测试数据
      • 创建程序对数据进行统计

(1)创建程序生成JSON格式的File源测试数据

  • 为了演示JSON格式文件的处理,这里随机生成一些JSON格式的文件来进行测试。
  • 代码文件spark_ss_filesource_generate.py内容如下:
    【Spark编程基础】第7章 Structured Streaming
    【Spark编程基础】第7章 Structured Streaming
    【Spark编程基础】第7章 Structured Streaming
    【Spark编程基础】第7章 Structured Streaming
  • 这段程序首先建立测试环境,清空测试数据所在的目录,接着使用for循环一千次来生成一千个文件,
  • 文件名为“e-mall-数字.json”,
  • 文件内容是不超过100行的随机JSON行,行的格式是类似如下:
    • {“eventTime”: 1546939167, “action”: “logout”, “district”: “fujian”}\n

(2)创建程序对数据进行统计

  • spark_ss_filesource.py”,其代码内容如下:
    【Spark编程基础】第7章 Structured Streaming
    【Spark编程基础】第7章 Structured Streaming
    【Spark编程基础】第7章 Structured Streaming
    【Spark编程基础】第7章 Structured Streaming

(3)测试运行程序
【Spark编程基础】第7章 Structured Streaming文章来源地址https://www.toymoban.com/news/detail-475765.html

7.3.2 Kafka源

7.3.3 Socket源

7.3.4 Rate源

7.3.1 File源

7.3.2 Kafka源

7.3.3 Socket源

7.3.4 Rate源

7.4 输出操作

7.5 容错处理(自学)

7.6 迟到数据处理(自学)

7.7 查询的管理和监控(自学)


总结

到了这里,关于【Spark编程基础】第7章 Structured Streaming的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark编程实验四:Spark Streaming编程

    目录 一、目的与要求 二、实验内容 三、实验步骤 1、利用Spark Streaming对三种类型的基本数据源的数据进行处理 2、利用Spark Streaming对Kafka高级数据源的数据进行处理 3、完成DStream的两种有状态转换操作 4、把DStream的数据输出保存到文本文件或MySQL数据库中 四、结果分析与实验

    2024年02月03日
    浏览(35)
  • Spark Streaming 编程权威使用指南

    注意:本文档为Spark的旧版本Streaming引擎。Spark Streaming 不再更新,是一个遗留项目。在Spark中有一种新的、更易用的流处理引擎,称为结构化流式处理。您应该使用Spark结构化流处理来开发流式应用和流水线。请参阅结构化流式处理编程指南。 Spark Streaming 是 Spark 核心 API 的扩

    2024年02月04日
    浏览(53)
  • 大数据编程实验四:Spark Streaming

    一、目的与要求 1、通过实验掌握Spark Streaming的基本编程方法; 2、熟悉利用Spark Streaming处理来自不同数据源的数据。 3、熟悉DStream的各种转换操作。 4、熟悉把DStream的数据输出保存到文本文件或MySQL数据库中。 二、实验内容 1.参照教材示例,利用Spark Streaming对三种类型的基

    2024年02月03日
    浏览(51)
  • 实验四 Spark Streaming编程初级实践

    数据流  :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。 1.下载安装包 https://www.apache.org/dyn/closer.lua/flume/

    2024年04月26日
    浏览(46)
  • 结构化流(Structured Streaming)

    有界数据: 无界数据: 结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL … Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针

    2024年01月17日
    浏览(54)
  • Structured_Streaming和Kafka整合

    默认情况下,Spark的结构化流支持多种输出方案: File Sink foreach sink 允许对输出的数据进行任意的处理操作,具体如何处理由用户自定义函数决定。对输出的数据一个个进行处理操作。 使用方式主要有二种 方式一: 方式二:这种方式的适用场景是需要和资源打交道的情况(

    2024年01月19日
    浏览(57)
  • pyspark之Structured Streaming file文件案例1

    # generate_file.py  # 生成数据 生成500个文件,每个文件1000条数据 # 生成数据格式:eventtime name province action ()时间 用户名 省份 动作) import os  import time import shutil import time FIRST_NAME = [\\\'Zhao\\\', \\\'Qian\\\', \\\'Sun\\\', \\\'Li\\\', \\\'Zhou\\\', \\\'Wu\\\', \\\'Zheng\\\', \\\'Wang\\\'] SECOND_NAME = [\\\'San\\\', \\\'Si\\\', \\\'Wu\\\', \\\'Chen\\\', \\\'Yang\\\', \\\'Min\\\', \\\'Jie\\\', \\\'Qi

    2024年01月21日
    浏览(33)
  • spark介绍之spark streaming

    Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、w

    2024年02月02日
    浏览(32)
  • Spark(39):Streaming DataFrame 和 Streaming DataSet 输出

    目录 0. 相关文章链接 1. 输出的选项 2. 输出模式(output mode) 2.1. Append 模式(默认) 2.2. Complete 模式 2.3. Update 模式 2.4. 输出模式总结 3. 输出接收器(output sink) 3.1. file sink 3.2. kafka sink 3.2.1. 以 Streaming 方式输出数据 3.2.2. 以 batch 方式输出数据 3.3. console sink 3.4. memory sink 3.5. fo

    2024年02月13日
    浏览(34)
  • Spark面试整理-解释Spark Streaming是什么

    Spark Streaming是Apache Spark的一个组件,它用于构建可扩展、高吞吐量、容错的实时数据流处理应用。Spark Streaming使得可以使用Spark的简单编程模型来处理实时数据。以下是Spark Streaming的一些主要特点: 1. 微批处理架构 微批处理: Spark Streaming的核心是微批处理模型。它将实

    2024年04月13日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包