pyspark之Structured Streaming file文件案例1

这篇具有很好参考价值的文章主要介绍了pyspark之Structured Streaming file文件案例1。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

# generate_file.py 
# 生成数据 生成500个文件,每个文件1000条数据
# 生成数据格式:eventtime name province action ()时间 用户名 省份 动作)
import os 
import time
import shutil
import time

FIRST_NAME = ['Zhao', 'Qian', 'Sun', 'Li', 'Zhou', 'Wu', 'Zheng', 'Wang']
SECOND_NAME = ['San', 'Si', 'Wu', 'Chen', 'Yang', 'Min', 'Jie', 'Qi']
PROVINCE = ['BeiJing', 'ShanDong', 'ShangHai', 'HeNan', 'HaErBin']
ACTION = ['login', 'logout', 'purchase']

PATH = "/opt/software/tmp/"
DATA_PATH = "/opt/software/tmp/data/"
    # 初始化环境
    def test_Setup():
        if os.path.exists(DATA_PATH):
            shutil.rmtree(DATA_PATH)
        os.mkdir(DATA_PATH)

    # 清理数据,恢复测试环境
    def test_TearDown():
        shutile.rmtree(DATA_PATH)

    # 数据保存文件
    def writeAndMove(filename,content):
        with open(PATH+filename,'wt',encoding='utf-8') as f:
            f.write(content)
        shutil.move(PATH+filename,DATA_PATH+filename)

if __name__ == '__main__':

    test_Setup()
    
    for i in range(500):
        filename = "user_action_{}.log".format(i)
        """
        验证spark输出模式,complete和update,增加代码,第一个文件i=0时,设置PROVINCE = "TAIWAN"
        """
        if i == 0:
            province= ['TaiWan']
        else:
            province = PROVINCE
        content = ""
        for _ in range(1000):
            content += "{} {} {} {}\n".format(str(int(time.time())),random.choice(FIRST_NAME)+random.choice(SECOND_NAME),random.choice(province),random.choice(ACTION))
        writeAndMove(filename,content)    
        time.sleep(10)

# spark_file_test.py  
# 读取DATA文件夹下面文件,按照省份统计数据,主要考虑window情况,按照window情况测试,同时针对    outputMode和输出console和mysql进行考虑,其中保存到mysql时添加batch字段

from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.functions import split,lit,from_unixtime

DATA_PATH = "/opt/software/tmp/data/"

if __name__ == '__main__':
    spark = SparkSession.builder.getOrCreate()
    lines = spark.readStream.format("text").option("seq","\n").load(DATA_PATH)
    # 分隔符为空格
    userinfo = lines.select(split(lines.value," ").alias("info"))
    # 第一个为eventtime  第二个为name   第三个为province  第四个为action 
    # userinfo['info'][0]等同于userinfo['info'].getIterm(0)
    user = userinfo.select(from_unixtime(userinfo['info'][0]).alias('eventtime'),
        userinfo['info'][1].alias('name'),userinfo['info'][2].alias('province'),
        userinfo['info'][3].alias('action'))
    """
    测试1:数据直接输出到控制台,由于没有采用聚合,输出模式选择update
    user.writeStream.outputMode("update").format("console").trigger(processingTime="8 seconds").start().awaitTermination()
    """
    """
    测试2:数据存储到数据库,新建数据库表,可以通过printSchema()查看数据类型情况
    def insert_into_mysql_batch(df:DataFrame,batch):
        if df.count()>0:
        # 此处将batch添加到df中,采用lit函数
            data = df.withColumn("batch",lit(batch))
            data.write.format("jdbc"). \
        option("driver","com.mysql.jdbc.Driver"). \
        option("url","jdbc:mysql://localhost:3306/spark").option("user","root").\
        option("password","root").option("dbtable","user_log").\
        option("batchsize",1000).mode("append").save()    
        else:
            pass
user.writeStream.outputMode("update").foreachBatch((insert_into_mysql_batch)).trigger(processingTime="20 seconds").start().awaitTermination()
    """
    """
    测试3:数据按照省份统计后,输出到控制台,分析complete和update输出模式区别,针对该问题,调整输入,province="TaiWan"只会输入1次,即如果输出方式complete,则每batch都会输出,update的话,只会出现在一个batch
    userProvinceCounts = user.groupBy("province").count()
    userProvinceCounts = userProvinceCounts.select(userProvinceCounts['province'],userProvinceCounts["count"].alias('sl'))
# 测试输出模式complete:complete将总计算结果都进行输出
"""
batch 0
TaiWan 1000
batch 1
TaiWan 1000
其他省份  sl
batch 2
TaiWan 1000
其他省份  sl
"""    userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination() 
# 测试输出模式update:update只输出相比上个批次变动的内容(新增或修改)
batch 0
TaiWan 1000  
batch 1 中没有TaiWan输出
userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination() 
    """文章来源地址https://www.toymoban.com/news/detail-811633.html

到了这里,关于pyspark之Structured Streaming file文件案例1的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Structured Streaming: Apache Spark的流处理引擎

    欢迎来到我们的技术博客!今天,我们要探讨的主题是Apache Spark的一个核心组件——Structured Streaming。作为一个可扩展且容错的流处理引擎,Structured Streaming使得处理实时数据流变得更加高效和简便。 Structured Streaming是基于Apache Spark SQL引擎构建的高级流处理框架。它允许用户

    2024年01月25日
    浏览(53)
  • 结构化流(Structured Streaming)

    有界数据: 无界数据: 结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL … Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针

    2024年01月17日
    浏览(56)
  • Structured_Streaming和Kafka整合

    默认情况下,Spark的结构化流支持多种输出方案: File Sink foreach sink 允许对输出的数据进行任意的处理操作,具体如何处理由用户自定义函数决定。对输出的数据一个个进行处理操作。 使用方式主要有二种 方式一: 方式二:这种方式的适用场景是需要和资源打交道的情况(

    2024年01月19日
    浏览(61)
  • 【头歌实训】PySpark Streaming 入门

    本关任务:使用 Spark Streaming 实现词频统计。 为了完成本关任务,你需要掌握: Spark Streaming 简介; Python 与 Spark Streaming; Python Spark Streaming API; Spark Streaming 初体验(套接字流)。 Spark Streaming 简介 Spark Streaming 是 Spark 的核心组件之一,为 Spark 提供了可拓展、高吞吐、容错的

    2024年02月04日
    浏览(43)
  • spark介绍之spark streaming

    Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、w

    2024年02月02日
    浏览(35)
  • Spark(39):Streaming DataFrame 和 Streaming DataSet 输出

    目录 0. 相关文章链接 1. 输出的选项 2. 输出模式(output mode) 2.1. Append 模式(默认) 2.2. Complete 模式 2.3. Update 模式 2.4. 输出模式总结 3. 输出接收器(output sink) 3.1. file sink 3.2. kafka sink 3.2.1. 以 Streaming 方式输出数据 3.2.2. 以 batch 方式输出数据 3.3. console sink 3.4. memory sink 3.5. fo

    2024年02月13日
    浏览(37)
  • Spark编程实验四:Spark Streaming编程

    目录 一、目的与要求 二、实验内容 三、实验步骤 1、利用Spark Streaming对三种类型的基本数据源的数据进行处理 2、利用Spark Streaming对Kafka高级数据源的数据进行处理 3、完成DStream的两种有状态转换操作 4、把DStream的数据输出保存到文本文件或MySQL数据库中 四、结果分析与实验

    2024年02月03日
    浏览(38)
  • Spark面试整理-解释Spark Streaming是什么

    Spark Streaming是Apache Spark的一个组件,它用于构建可扩展、高吞吐量、容错的实时数据流处理应用。Spark Streaming使得可以使用Spark的简单编程模型来处理实时数据。以下是Spark Streaming的一些主要特点: 1. 微批处理架构 微批处理: Spark Streaming的核心是微批处理模型。它将实

    2024年04月13日
    浏览(48)
  • Spark的生态系统概览:Spark SQL、Spark Streaming

    Apache Spark是一个强大的分布式计算框架,用于大规模数据处理。Spark的生态系统包括多个组件,其中两个重要的组件是Spark SQL和Spark Streaming。本文将深入探讨这两个组件,了解它们的功能、用途以及如何在Spark生态系统中使用它们。 Spark SQL是Spark生态系统中的一个核心组件,它

    2024年02月01日
    浏览(40)
  • Spark Streaming(头歌)

    第1关:套接字流实现黑名单过滤  代码: import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds object TransformBlackList {   def main(args: Array[String]): Unit = {     /********** Begin **********/  //初始化     val sparkConf = new SparkConf

    2024年02月05日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包