结构化流(Structured Streaming)

这篇具有很好参考价值的文章主要介绍了结构化流(Structured Streaming)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

结构化流介绍

有界和无界数据

  • 有界数据:
指的数据有固定的开始和固定的结束,数据大小是固定。我们称之为有界数据。对于有界数据,一般采用批处理方案(离线计算)

特点:
	1-数据大小是固定
	2-程序处理有界数据,程序最终一定会停止
  • 无界数据:
指的数据有固定的开始,但是没有固定的结束。我们称之为无界数据
对于无界数据,我们一般采用流式处理方案(实时计算)

特点:
 	1-数据没有明确的结束,也就是数据大小不固定
 	2-数据是源源不断的过来
 	3-程序处理无界数据,程序会一直运行不会结束

基本介绍

结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL …

Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针对的有界的数据集,但是为了能够兼容实时计算的处理场景,提供微批处理模型,本质上还是批处理,只不过批与批之间的处理间隔时间变短了,让我们感觉是在进行流式的计算操作,目前默认的微批可以达到100毫秒一次

​ 真正的流处理引擎: Flink、Storm(早期流式处理引擎)、Flume(流式数据采集)

实时数据案例–词频统计

需求:结构化流(Structured Streaming),sql,大数据,spark,kafka
代码实现:

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('structured_streaming_wordcount')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    # 3- 数据处理
    result_df = init_df.select(
        F.explode(F.split('value',' ')).alias('word')
    ).groupBy('word').agg(
        F.count('word').alias('cnt')
    )

    # init_df.show()

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()

程序运行结果:
结构化流(Structured Streaming),sql,大数据,spark,kafka
代码测试操作步骤:

首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据
yum -y install nc
	
执行nc命令, 开启端口号, 写入数据:
nc -lk 55555

注意: 要先启动nc,再启动我们的程序

查看端口号是否被使用命令:
netstat -nlp | grep 要查询的端口

结构化流(Structured Streaming),sql,大数据,spark,kafka
可能遇到的错误:
结构化流(Structured Streaming),sql,大数据,spark,kafka

结构化流的编程模型

数据结构

结构化流(Structured Streaming),sql,大数据,spark,kafka
在结构化流中,我们可以将DataFrame称为无界的DataFrame或者无界的二维表

数据源部分

结构化流默认提供了多种数据源,从而可以支持不同的数据源的处理工作。目前提供了如下数据源:

  • Socket Source:网络套接字数据源,一般用于测试。也就是从网络上消费/读取数据
  • File Source:文件数据源。读取文件系统,一般用于测试。如果文件夹下发生变化,有新文件产生,那么就会触发程序的运行
  • Kafka Source:Kafka数据源。也就是作为消费者来读取Kafka中的数据。一般用于生产环境。
  • Rate Source:速率数据源。一般用于测试。通过配置参数,由结构化流自动生成测试数据。## Operation操作

对应官网文档内容:https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources
结构化流(Structured Streaming),sql,大数据,spark,kafka
File Source
将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet…
相关的参数:

option参数 描述说明
maxFilesPerTrigger 每次触发时要考虑的最大新文件数 (默认: no max)
latestFirst 是否先处理最新的新文件, 当有大量文件积压时有用 (默认: false)
fileNameOnly 是否检查新文件只有文件名而不是完整路径(默认值:false)将此设置为 true 时,以下文件将被视为同一个文件,因为它们的文件名“dataset.txt”相同: “file:///dataset.txt” “s3://a/dataset.txt " “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt”

读取代码通用格式:

sparksession.readStream
	.format('CSV|JSON|Text|Parquet|ORC...')
	.option('参数名1','参数值1')
	.option('参数名2','参数值2')
	.option('参数名N','参数值N')
	.schema(元数据信息)
	.load('需要监听的目录地址')
	
针对具体数据格式,还有对应的简写API格式,例如:
	sparksession.readStream.csv(path='需要监听的目录地址',schema=元数据信息。。。)

代码操作

import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions","1")\
        .appName('file_source')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入:File Source文件数据源
    """
        File Source总结
            1- 只能监听目录,不能监听具体的文件
            2- 可以通过*通配符的形式监听目录中满足条件的文件
            3- 如果监听目录中有子目录,那么无法监听到子目录的变化情况
    """
    init_df = spark.readStream.csv(
        path="file:///export/data/",
        sep=",",
        encoding="UTF-8",
        schema="id int,name string"
    )


    # 3- 数据处理
    # 4- 数据输出
    # 5- 启动流式任务
    init_df.writeStream.format("console").outputMode("append").start().awaitTermination()

可能遇到的错误一:
结构化流(Structured Streaming),sql,大数据,spark,kafka

原因: 如果是文件数据源,需要手动指定schema信息

可能遇到的错误二:
结构化流(Structured Streaming),sql,大数据,spark,kafka

原因: File source只能监听目录,不能监听具体文件
文件数据源特点:
1- 不能够监听具体的文件,否则会报错误java.lang.IllegalArgumentException: Option 'basePath' must be a directory
2- 可以通过通配符的形式,来监听目录下的文件,符合要求的才会被读取
3- 如果监听目录中有子目录,那么无法监听到子目录的变化情况

Operations操作

指的是数据处理部分,该操作和Spark SQL中是完全一致。可以使用SQL方式进行处理,也可以使用DSL方式进行处理。

Sink输出操作

在结构化流中定义好DataFrame或者处理好DataFrame之后,调用writeStream()方法完成数据的输出操作。在输出的过程中,我们可以设置一些相关的属性,然后启动结构化流程序运行。
结构化流(Structured Streaming),sql,大数据,spark,kafka

输出模式

在进行数据输出的时候,必须通过outputMode来设置输出模式。输出模式提供了3种不同的模式:

  • 1- append模式:增量模式

    特点:当结构化程序处理数据的时候,如果有了新数据,才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作,直接报错。而且也不支持排序操作。如果有了排序,直接报错。

  • 2- complete模式:完全(全量)模式

    特点:当结构化程序处理数据的时候,每一次都是针对全量的数据进行处理。由于数据越来越多,所以在数据处理阶段,必须要有聚合操作。如果没有聚合操作,直接报错。另外还支持排序,但是不是强制要求。

  • 3- update模式:更新模式

    特点:支持聚合操作。当结构化程序处理数据的时候,如果处理阶段没有聚合操作,该模式效果和append模式是一致。如果有了聚合操作,只会输出有变化和新增的内容。但是不支持排序操作,如果有了排序,直接报错。

append模式:

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('structured_streaming_wordcount')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    init_df.createTempView("tmp_table")

    # 3- 数据处理
    # 正常:没有聚合操作,也没有排序
    result_df = spark.sql("""
        select
            explode(split(value,' ')) as word
        from tmp_table
    """)

    # 异常:有聚合操作,没有排序
    # result_df = spark.sql("""
    #     select
    #         word,count(1) as cnt
    #     from (
    #         select
    #             explode(split(value,' ')) as word
    #         from tmp_table
    #     )
    #     group by word
    # """)

    # 异常:没有聚合操作,有排序
    # result_df = spark.sql("""
    #     select
    #         word
    #     from (
    #         select
    #             explode(split(value,' ')) as word
    #         from tmp_table
    #     )
    #     order by word
    # """)

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format('console').outputMode('append').start().awaitTermination()

如果有了聚合操作,会报如下错误:
结构化流(Structured Streaming),sql,大数据,spark,kafka
如果有了排序操作,会报如下错误:
结构化流(Structured Streaming),sql,大数据,spark,kafka

complete模式:

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('structured_streaming_wordcount')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    init_df.createTempView("tmp_table")

    # 3- 数据处理
    # 异常:没有聚合操作
    # result_df = spark.sql("""
    #     select
    #         explode(split(value,' ')) as word
    #     from tmp_table
    # """)

    # 正常:有聚合操作,没有排序
    result_df = spark.sql("""
        select
            word,count(1) as cnt
        from (
            select
                explode(split(value,' ')) as word
            from tmp_table
        )
        group by word
        order by cnt
    """)

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()

如果没有聚合操作,会报如下错误:
结构化流(Structured Streaming),sql,大数据,spark,kafka
update模式:

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('structured_streaming_wordcount')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    init_df.createTempView("tmp_table")

    # 3- 数据处理
    # 正常:没有聚合操作
    result_df = spark.sql("""
        select
            explode(split(value,' ')) as word
        from tmp_table
    """)

    # 正常:有聚合操作,没有排序
    # result_df = spark.sql("""
    #     select
    #         word,count(1) as cnt
    #     from (
    #         select
    #             explode(split(value,' ')) as word
    #         from tmp_table
    #     )
    #     group by word
    # """)

    # 异常:有排序
    result_df = spark.sql("""
        select
            word
        from (
            select
                explode(split(value,' ')) as word
            from tmp_table
        )
        order by word
    """)

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format('console').outputMode('update').start().awaitTermination()

如果有了排序操作,会报如下错误:
结构化流(Structured Streaming),sql,大数据,spark,kafka文章来源地址https://www.toymoban.com/news/detail-798844.html

到了这里,关于结构化流(Structured Streaming)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 三、计算机理论-关系数据库-结构化查询语言SQL

    SQL 概述 是一种介于关系代数与关系演算之间的语言,现成为关系数据库的标准语言 特点:综合统一、高度非过程化、面向集合的操作方式、以同一种语法结构提供两种使用方式(直接使用或者嵌入高级语言使用)、语言简洁,易学易用。 四大功能如下: SQL功能 动词 数据查

    2024年01月24日
    浏览(55)
  • SQL:结构化查询语言

    创建一张表并插入数据: 以下常用函数以MySQL为例,其它数据库类似

    2024年02月06日
    浏览(47)
  • 结构化数据、非结构化数据、半结构化数据

    结构化的数据一般是指可以使用关系型数据库表示和存储,可以用二维表来逻辑表达实现的数据。例如:需要多少个属性,每个属性什么类型,每个属性的取值范围等等,类似下图所示, 提前定义好了一个二维矩阵的元数据 ,包含有列名称、列的类型、列的约束等:   可见

    2024年02月09日
    浏览(62)
  • MySql003——SQL(结构化查询语言)基础知识

    DB:数据库(Database) 即存储数据的“仓库”,其本质是一个 文件系统 。它保存了一系列有组织的数据。 DBMS:数据库管理系统(Database Management System) 是一种操纵和管理数据库的 大型软件 (例如我们前面下载的MySQL软件),用于建立、使用和维护数据库,对数据库进行统一

    2024年02月15日
    浏览(44)
  • 【案例】--非结构化数据中台案例

    最近接触一个平台架构的讨论,公司需要一个非结构化数据中台,理念是能够满足存储随时变换的非结构化数据,另外引入低代码思想。由于非结构化数据是未知的,不同业务的数据是不同,为了更好的使用,低代码就需要一种方案,在尽量不开发代码下满足相关需求变化,

    2024年02月10日
    浏览(56)
  • 什么是T-SQL编程?T-SQL是Transact-SQL的缩写,是一种扩展了SQL(结构化查询语言)的编程语言,用于Microsoft SQL Server数据库管理系统中的数据管理和操作。T-

    什么是T-SQL编程? T-SQL是Transact-SQL的缩写,是一种 扩展了SQL(结构化查询语言) 的编程语言,用于Microsoft SQL Server数据库管理系统中的数据管理和操作。T-SQL支持创建 存储过程、触发器、函数 等高级特性,能够更加灵活地进行数据操作和处理。基本的T-SQL语法与标准SQL很相

    2024年01月21日
    浏览(89)
  • 【跟小嘉学 Rust 编程】五、使用结构体关联结构化数据

    【跟小嘉学 Rust 编程】一、Rust 编程基础 【跟小嘉学 Rust 编程】二、Rust 包管理工具使用 【跟小嘉学 Rust 编程】三、Rust 的基本程序概念 【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念 【跟小嘉学 Rust 编程】五、使用结构体关联结构化数据 本章节讲解一种自定义数据类型

    2024年02月10日
    浏览(45)
  • python序列化和结构化数据详解

    序列化和结构化数据是计算机程序中非常重要的概念,它们的原理和应用在许多应用程序中都是必不可少的。Python作为一种高级编程语言,在序列化和结构化数据方面提供了很多优秀的解决方案。在本文中,我们将详细介绍Python中序列化和结构化数据的相关概念和应用。 1.

    2024年02月08日
    浏览(58)
  • Spark Structured Streaming使用教程

    Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎,Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。 Structured Streaming把持续不断的流式数据当做一个不断追加的表,这使得新的流处理模型与批处理模型非常相似。您将把流计算表

    2024年02月03日
    浏览(55)
  • 一种使得大模型输出结构化数据的简易方法

    最近在用大模型跑一些数据,于是就不可避免的遇到了如何让大模型输出的格式便于处理这个问题。经过一些研究发现了一套比较有用的方法,在这里总结一下。 任务是这样的,我需要用大模型(比如ChatGPT,ChatGLM等等)对文本进行名词提取。输入一段文本,我需要大模型理

    2024年02月16日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包