Spark Structured Streaming使用教程

这篇具有很好参考价值的文章主要介绍了Spark Structured Streaming使用教程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎,Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。
Structured Streaming把持续不断的流式数据当做一个不断追加的表,这使得新的流处理模型与批处理模型非常相似。您将把流计算表示为在静态表上的标准批处理查询,Spark将其作为无界输入表上的增量查询运行。

1、输入数据源

  • File source - 以数据流的形式读取写入目录中的文件。文件将按照文件修改时间的先后顺序进行处理。如果设置了latestFirst,则顺序将相反。支持的文件格式为text, CSV, JSON, ORC, Parquet。请参阅DataStreamReader接口的文档,了解最新的列表,以及每种文件格式支持的选项。注意,监视目录的文件改变,只能是原子性的改变,比如把文件放入该目录,而不是持续写入该目录中的某个文件。
  • Kafka source - 从Kafka读取数据。它兼容Kafka代理版本0.10.0或更高版本。查看Kafka集成指南了解更多细节。
  • Socket source (用于测试) - 从套接字连接读取UTF8文本数据。监听服务器套接字位于驱动程序。请注意,这应该仅用于测试,因为它不提供端到端的容错保证。
  • Rate source (用于测试) - 以每秒指定的行数生成数据,每个输出行包含一个时间戳和值。其中timestamp为包含消息发送时间的timestamp类型,value为包含消息计数的Long类型,从0开始作为第一行。该源代码用于测试和基准测试。

2、输出模式

  • 我们可以定义每次结果表中的数据更新时,以何种方式,将哪些数据写入外部存储。有3种模式:
  • complete mode:所有数据都会被写入外部存储。具体如何写入,是根据不同的外部存储自身来决定的。
  • append mode:只有新的数据,以追加的方式写入外部存储。只有当我们确定,result table中已有的数据是肯定不会被改变时,才应该使用append mode。
  • update mode:只有被更新的数据,包括增加的和修改的,会被写入外部存储中。
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

3、sink输出结果

  • File sink - 将输出存储到一个目录。
    输出模式支持Append
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
  • Kafka sink - 将输出存储到Kafka中的一个或多个主题。
    输出模式支持Append, Update, Complete
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
  • Console sink (用于测试) - 每次有触发器时,将输出打印到控制台/标准输出。支持两种输出模式:Append和Complete。这应该用于在低数据量上进行调试,因为在每次触发后都会收集整个输出并将其存储在驱动程序的内存中。
    输出模式支持Append, Update, Complete
writeStream
    .format("console")
    .start()
  • Memory sink (用于测试) - 输出以内存表的形式存储在内存中。支持两种输出模式:Append和Complete。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
    输出模式支持Append, Complete
输出以内存表的形式存储在内存中。支持两种输出模式:Append和Complete。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
  • 自定义输出Foreach和ForeachBatch - Foreach:针对每条数据的输出;ForeachBatch:针对每批次的数据输出。
    输出模式支持Append, Update, Complete
// Foreach
streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter<String>() {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

// ForeachBatch
streamingDatasetOfString.writeStream().foreachBatch(
  new VoidFunction2<Dataset<String>, Long>() {
    public void call(Dataset<String> dataset, Long batchId) {
      // Transform and write batchDF
    }    
  }
).start();

4、时间窗口

4.1、时间窗口

在业务场景中,经常会遇到按时间段进行聚合操作,Spark提供了基于滑动窗口的事件时间集合操作,每个时间段作为一个分组,并对每个组内的每行数据进行聚合操作。
Spark Structured Streaming使用教程,java,大数据,spark,大数据

可以使用groupBy()和window()操作来表示窗口聚合。

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

4.2、时间水印(Watermarking)

WaterMarking的作用主要是为了解决:延迟到达的数据是否丢弃,系统可以删除过期的数据。
Spark Structured Streaming使用教程,java,大数据,spark,大数据

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes") // 延迟10分钟后到达的数据将会被丢弃
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),
        col("word"))
    .count();

5、使用例子

package com.penngo.spark;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.art.Art;

import java.io.Serializable;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.window;

public class SparkStructStream {
    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static class DataTxt implements Serializable {
        private String text;
        private Timestamp time;

        public DataTxt(String text, LocalDateTime time) {
            this.text = text;
            this.time = Timestamp.valueOf(time);
        }
        public String getText() {
            return text;
        }
        public void setText(String text) {
            this.text = text;
        }
        public Timestamp getTime() {
            return time;
        }
        public void setTime(Timestamp time) {
            this.time = time;
        }
    }
    
    public static void socket(SparkSession spark) throws Exception{
        // 运行:nc -lk 9999
        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", "localhost")
                .option("port", 9999)
                .load();
        Dataset<DataTxt> words = lines
                .as(Encoders.STRING())
                .map((MapFunction<String, DataTxt>) x -> {
                    String[] strs = x.split(",");

                    LocalDateTime date = LocalDateTime.parse(strs[1],formatter);
                    Arrays.asList(x.split(",")).iterator();
                    DataTxt data = new DataTxt(strs[0], date);
                    return data;
                }, Encoders.bean(DataTxt.class));

        Dataset<Row> wordCounts = words.toDF()
                .withWatermark("time", "10 minutes") // 延迟10分钟后到达的数据将会被丢弃
                .groupBy(
                        window(col("time"), "10 minutes", "5 minutes"),
                        col("text"))
                .count();


        wordCounts.writeStream().outputMode("append")
                .foreach(new ForeachWriter<Row>() {
                    @Override public boolean open(long partitionId, long version) {
//                        System.out.println("open==========partitionId:" + partitionId + ",version:" + version);
                        return true;
                    }

                    @Override public void process(Row record) {
                        // Write string to connection
                        System.out.println("recordxxxxxxxxxxxxxxxxxx:======" + record);

                    }

                    @Override public void close(Throwable errorOrNull) {
                        // Close the connection
//                        System.out.println("close==========errorOrNull:" + errorOrNull);
                    }
                })
//                .format("console")
                .start().awaitTermination();
    }

    public static void kafka(SparkSession spark) throws Exception{
        // Subscribe to 1 topic
        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "192.168.245.1:9092")
                .option("subscribe", "topic-news")
                .option("startingOffsets","latest")
                .option("maxOffsetsPerTrigger",1000)
                .load();
        
        df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
        df.printSchema();
        df.writeStream().outputMode("append")
                .format("console")
                .start().awaitTermination();
    }
    public static void main(String[] args) throws Exception{
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF);
        Logger.getLogger("org.apache.kafka").setLevel(Level.WARN);

        System.setProperty("hadoop.home.dir", "/usr/local/hadoop-3.3.6");
        System.setProperty("HADOOP_USER_NAME", "root");

        SparkSession spark = SparkSession
                .builder()
                .appName("SparkStructStream")
                .master("local[*]")
                .getOrCreate();

//        socket(spark);
        kafka(spark);
    }
}

参考自官方文档:https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html文章来源地址https://www.toymoban.com/news/detail-772239.html

到了这里,关于Spark Structured Streaming使用教程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“t”键分割,数据内容及数据格式如下: 项目环境说明 开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。 1、新创一个文件folder命名为li

    2024年02月13日
    浏览(46)
  • 在Spring Boot中使用Spark Streaming进行实时数据处理和流式计算

    引言: 在当今大数据时代,实时数据处理和流式计算变得越来越重要。Apache Spark作为一个强大的大数据处理框架,提供了Spark Streaming模块,使得实时数据处理变得更加简单和高效。本文将深入浅出地介绍如何在Spring Boot中使用Spark Streaming进行实时数据处理和流式计算,并提供

    2024年03月27日
    浏览(35)
  • 大数据——Spark Streaming

    Spark Streaming是一个可扩展、高吞吐、具有容错性的流式计算框架。 之前我们接触的spark-core和spark-sql都是离线批处理任务,每天定时处理数据,对于数据的实时性要求不高,一般都是T+1的。但在企业任务中存在很多的实时性的任务需求,列如双十一的京东阿里都会要求做一个

    2024年02月07日
    浏览(38)
  • Spark Streaming实时数据处理

    作者:禅与计算机程序设计艺术 Apache Spark™Streaming是一个构建在Apache Spark™之上的快速、微批次、容错的流式数据处理系统,它可以对实时数据进行高吞吐量、低延迟地处理。Spark Streaming既可用于流计算场景也可用于离线批处理场景,而且可以将结构化或无结构化数据源(如

    2024年02月06日
    浏览(40)
  • Spark Streaming实时流式数据处理

    作者:禅与计算机程序设计艺术 Apache Spark Streaming 是 Apache Spark 提供的一个用于高吞吐量、容错的流式数据处理引擎。它可以实时的接收数据并在系统内部以微批次的方式进行处理,并将结果输出到文件、数据库或实时消息系统中。Spark Streaming 支持 Java、Scala 和 Python 编程语言

    2024年02月08日
    浏览(39)
  • 大数据编程实验四:Spark Streaming

    一、目的与要求 1、通过实验掌握Spark Streaming的基本编程方法; 2、熟悉利用Spark Streaming处理来自不同数据源的数据。 3、熟悉DStream的各种转换操作。 4、熟悉把DStream的数据输出保存到文本文件或MySQL数据库中。 二、实验内容 1.参照教材示例,利用Spark Streaming对三种类型的基

    2024年02月03日
    浏览(43)
  • Spark Streaming 编程权威使用指南

    注意:本文档为Spark的旧版本Streaming引擎。Spark Streaming 不再更新,是一个遗留项目。在Spark中有一种新的、更易用的流处理引擎,称为结构化流式处理。您应该使用Spark结构化流处理来开发流式应用和流水线。请参阅结构化流式处理编程指南。 Spark Streaming 是 Spark 核心 API 的扩

    2024年02月04日
    浏览(43)
  • Spark Streaming + Kafka构建实时数据流

    1. 使用Apache Kafka构建实时数据流 参考文档链接:https://cloud.tencent.com/developer/article/1814030 2. 数据见UserBehavior.csv 数据解释:本次实战用到的数据集是CSV文件,里面是一百零四万条淘宝用户行为数据,该数据来源是阿里云天池公开数据集 根据这一csv文档运用Kafka模拟实时数据流,

    2024年02月12日
    浏览(33)
  • 实时大数据流处理技术:Spark Streaming与Flink的深度对比

    引言 在当前的大数据时代,企业和组织越来越多地依赖于实时数据流处理技术来洞察和响应业务事件。实时数据流处理不仅能够加快数据分析的速度,还能提高决策的效率和准确性。Apache Spark Streaming和Apache Flink是目前两个主要的实时数据流处理框架,它们各自拥有独特的特

    2024年03月10日
    浏览(49)
  • spark介绍之spark streaming

    Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、w

    2024年02月02日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包