spark 经典demo 的 scala 和 java 实现

这篇具有很好参考价值的文章主要介绍了spark 经典demo 的 scala 和 java 实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

spark 经典demo 的 scala 和 java 实现,spark 教程,spark,scala,大数据

💐💐扫码关注公众号,回复 spark 关键字下载geekbang 原价 90 元 零基础入门 Spark 学习资料💐💐

准备 maven 依赖

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>3.5.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.5.1</version>
    </dependency>

先上代码 

WordCount(单词计数)

要先对文件中的单词做统计计数,然后再打印出频次最高的 5 个单词,江湖人称“Word Count”wikiOfSpark.txt 文件下载地址:这里

scala 实现

import org.apache.spark.rdd.RDD
 
// 这里的下划线"_"是占位符,代表数据文件的根目录
val rootPath: String = "/Users/mustafa/Documents/databases/bigdata/spark/spark_core"
val file: String = s"${rootPath}/wikiOfSpark.txt"
 
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
 
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))
 
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
 
// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

java实现

package com.mustafa.mynetty;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.jetbrains.annotations.NotNull;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

public class WordCount
{
    public static void main( String[] args )
    {
        JavaSparkContext context = getJavaSparkContext();
        String file = "hdfs://node1:8020/bigdata/spark/wordCount/datas/wikiOfSpark.txt";
        JavaRDD<String> lineRDD = context.textFile(file, 3);
        JavaRDD<String> wordRDD = lineRDD.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());
        List<String> wordCountWords = Arrays.asList("Apache", "Spark");
        Broadcast<List<String>> bc = context.broadcast(wordCountWords);
        JavaRDD<String> cleanWordRDD = wordRDD.filter((Function<String, Boolean>) word -> bc.value().contains(word));
        JavaPairRDD<String, Integer> kvRDD = cleanWordRDD.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word,1));
        JavaPairRDD<String, Integer> wordCounts = kvRDD.reduceByKey((Function2<Integer, Integer, Integer>) (x, y) -> x + y);
        JavaPairRDD<Integer, String> wordCountsRevert = wordCounts.mapToPair((PairFunction<Tuple2<String, Integer>, Integer, String>) kv -> new Tuple2<>(kv._2(), kv._1()));
        JavaPairRDD<Integer, String> wordCountsSorted =wordCountsRevert.sortByKey(false);
        String targetPath = "hdfs://node1:8020/bigdata/spark/wordCount/result";
        wordCountsSorted.saveAsTextFile(targetPath);
//        List<Tuple2<Integer, String>> result = wordCountsSorted.collect();
//        result.forEach(row -> System.out.println(row._2() + ":" + row._1()));
    }

    @NotNull
    private static JavaSparkContext getJavaSparkContext() {
        SparkConf conf = new SparkConf();
        conf.setMaster("spark://node0:7077");
        conf.setAppName("WordCount");
        conf.set("spark.executor.instances", "2");
        conf.set("spark.cores.max", "6");
        conf.set("spark.executor.cores", "3");
        conf.set("spark.executor.memory", "2g");
        conf.set("spark.memory.fraction", "0.9");
        conf.set("spark.memory.storageFraction", "0.1");
        JavaSparkContext context = new JavaSparkContext(conf);
        return context;
    }
}

小汽车摇号分析

 为了限制机动车保有量,从 2011 年开始,北京市政府推出了小汽车摇号政策。随着摇号进程的推进,在 2016 年,为了照顾那些长时间没有摇中号码牌的“准司机”,摇号政策又推出了“倍率”制度。

所谓倍率制度,它指的是,结合参与摇号次数,为每个人赋予不同的倍率系数。有了倍率加持,大家的中签率就由原来整齐划一的基础概率,变为“基础概率 * 倍率系数”。参与摇号的次数越多,倍率系数越大,中签率也会相应得到提高。

不过,身边无数的“准司机”总是跟我说,其实倍率这玩意没什么用,背了 8 倍、10 倍的倍率,照样摇不上!那么今天这一讲,咱们就来借着学习 Spark SQL 的机会,用数据来为这些还没摸过车的“老司机”答疑解惑,帮他们定量地分析一下,倍率与中签率之间,到底有没有关系?

2011 年到 2019 年北京市小汽车的摇号数据,你可以通过这个地址,从网盘进行下载,提取码为 ajs6

scala 实现

//spark-shell --conf spark.executor.memory=4g --conf spark.driver.memory=4g
import org.apache.spark.sql.DataFrame
 
val rootPath: String = "/Users/mustafa/Documents/databases/bigdata/spark/spark_sql"
// 申请者数据
val hdfs_path_apply: String = s"${rootPath}/apply"
// spark是spark-shell中默认的SparkSession实例
// 通过read API读取源文件
val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)
// 数据打印
applyNumbersDF.show
 
// 中签者数据
val hdfs_path_lucky: String = s"${rootPath}/lucky"
// 通过read API读取源文件
val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)
// 数据打印
luckyDogsDF.show

// 过滤2016年以后的中签数据,且仅抽取中签号码carNum字段
val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")
 
// 摇号数据与中签数据做内关联,Join Key为中签号码carNum
val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")

// 以batchNum、carNum做分组,统计倍率系数
val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum")).agg(count(lit(1)).alias("multiplier")) 

// 以carNum做分组,保留最大的倍率系数
val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum").agg(max("multiplier").alias("multiplier")) 

// 以multiplier倍率做分组,统计人数
val result: DataFrame = uniqueMultipliers.groupBy("multiplier").agg(count(lit(1)).alias("cnt")).orderBy("multiplier") 

result.collect

Java 实现

package com.mustafa.mynetty;

import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import scala.Enumeration;

import java.util.List;

public class CarsBet {

    public static void main(String[] args) {
        SparkSession session = SparkSession
                .builder()
                .appName("CarsBet")
                .master("spark://node0:7077")
                .config("spark.executor.instances", "2")
                .config("spark.cores.max", "8")
                .config("spark.executor.cores", "3")
//                .config("spark.executor.cores", "8")
                .config("spark.executor.memory", "2g")
//                .config("spark.executor.memory", "4g")
                .config("spark.memory.fraction", "0.9")
                .config("spark.memory.storageFraction", "0.2")
                .getOrCreate();

        String rootPath = "hdfs://node1:8020/bigdata/spark/carsBet/datas";
        String hdfs_path_apply = rootPath + "/apply";
        Dataset<Row> applyNumbersDF = session.read().parquet(hdfs_path_apply);

        String hdfs_path_lucky = rootPath + "/lucky";
        Dataset<Row> luckyDogsDF = session.read().parquet(hdfs_path_lucky);
        Dataset<Row> filteredLuckyDogs = luckyDogsDF.filter("batchNum >= 201601").select("carNum");
        Dataset<Row> filteredLuckyDogsCache = functions.broadcast(filteredLuckyDogs);

        Dataset<Row> jointDF = applyNumbersDF.join(filteredLuckyDogsCache, applyNumbersDF.col("carNum").equalTo(filteredLuckyDogsCache.col("carNum")), "inner").drop(filteredLuckyDogsCache.col("carNum"));
        Dataset<Row> jointDFCache = functions.broadcast(jointDF);

        Dataset<Row> multipliers = jointDFCache.groupBy("carNum", "batchNum").agg(functions.count(functions.lit(1)).alias("multiplier"));
        Dataset<Row> multipliersCache = functions.broadcast(multipliers);

        Dataset<Row> uniqueMultipliers = multipliersCache.groupBy("carNum").agg(functions.max("multiplier").alias("multiplier"));
        Dataset<Row> uniqueMultipliersCache = functions.broadcast(uniqueMultipliers);

        Dataset<Row> result = uniqueMultipliersCache.groupBy("multiplier").agg(functions.count(functions.lit(1)).alias("cnt")).orderBy("multiplier");

        result.write().format("csv").option("header", true).mode("overwrite").save("hdfs://node1:8020/bigdata/spark/carsBet/result");

//        result.cache();
//        result.count();
//
//        result.show();
//
//        List<Row> list = result.collectAsList();
//        list.forEach(row -> System.out.println(row.getLong(0) + ":" + row.getLong(1)));
    }

}

流动的 wordCount

在之前的 Word Count 中,数据以文件(wikiOfSpark.txt)的形式,一次性地“喂给”Spark,从而触发一次 Job 计算。而在“流动的 Word Count”里,数据以行为粒度,分批地“喂给”Spark,每一行数据,都会触发一次 Job 计算。

具体来说,使用 netcat 工具,向本地 9999 端口的 Socket 地址发送数据行。而 Spark 流处理应用,则时刻监听着本机的 9999 端口,一旦接收到数据条目,就会立即触发计算逻辑的执行。这里的计算逻辑,就是 Word Count。计算执行完毕之后,流处理应用再把结果打印到终端(Console)上。

打开一个终端,在 9999 端口喂数据。linux nc -lk 9999 ,mac nc -l -p 9999文章来源地址https://www.toymoban.com/news/detail-859620.html

scala 实现

import org.apache.spark.sql.DataFrame
 
// 设置需要监听的本机地址与端口号
val host: String = "127.0.0.1"
val port: String = "9999"

// 从监听地址创建DataFrame
var df: DataFrame = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()

// 首先把接收到的字符串,以空格为分隔符做拆分,得到单词数组words
df = df.withColumn("words", split($"value", " ")) 
// 把数组words展平为单词word
.withColumn("word", explode($"words"))
// 以单词word为Key做分组
.groupBy("word")
// 分组计数
.count()

df.writeStream
// 指定Sink为终端(Console)
.format("console")
// 指定输出选项
.option("truncate", false)
// 指定输出模式
.outputMode("complete")
//.outputMode("update")
// 启动流处理应用
.start()
// 等待中断指令
.awaitTermination()

java实现

package com.mustafa.mynetty;

import org.apache.spark.sql.*;
import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;

import java.util.concurrent.TimeoutException;

public class StreamingWordCount {

    public static void main(String[] args) {

        SparkSession session = SparkSession
                .builder()
                .appName("CarsBet")
                .master("local[*]")
                .config("spark.executor.instances", "2")
                .config("spark.cores.max", "6")
                .config("spark.executor.cores", "3")
                .config("spark.executor.memory", "2g")
                .config("spark.memory.fraction", "0.9")
                .config("spark.memory.storageFraction", "0.1")
                .getOrCreate();

        String host = "127.0.0.1";
        String port = "9999";

        Dataset<Row> df = session.readStream().format("socket").option("host", host).option("port", port).load();
        df = df.withColumn("inputs", functions.split(df.col("value"), ","))
                .withColumn("eventTime", functions.element_at(new Column("inputs"), 1).cast("timestamp"))
                .withColumn("words", functions.split(functions.element_at(new Column("inputs"), 2), " "))
                .withColumn("word", functions.explode(new Column("words")))
                .withWatermark("eventTime", "10 minute")
                .groupBy(functions.window(new Column("eventTime"), "5 minute"), new Column("word"))
                .count();

        Trigger trigger = new ProcessingTimeTrigger(5000);

        try {
            df.writeStream()
                    .trigger(trigger)
                    .option("checkpointLocation", "hdfs://node1:8020/software/spark/check/" + Long.toString(System.currentTimeMillis()))
                    .format("console")
                    .option("truncate", "false")
                    .outputMode("update")
                    .start()
                    .awaitTermination();
        } catch (StreamingQueryException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

}

集群运行

spark-submit \
--master spark://node0:7077 \
--deploy-mode cluster \
--class com.mustafa.mynetty.CarsBet \
hdfs://node1:8020/software/spark-demo-1.0-SNAPSHOT.jar

附录

文件上传到 hdfs 文件系统(小汽车摇号分析)

package com.mustafa.mynetty;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;

public class CarsBetUploader {

    private static FileSystem getFileSystem() throws IOException {
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(conf);
        return fileSystem;
    }

    private static void uploadHdfsFile(File localFile, String targetPath) throws IOException {
        FileSystem fileSystem = getFileSystem();
        fileSystem.delete(new Path(targetPath), true);

        try (FileInputStream inStream = new FileInputStream(localFile); FSDataOutputStream outStream = fileSystem.create(new Path(targetPath))) {
            IOUtils.copy(inStream, outStream, 4096);
        }
    }

    private static void readHdfsFile(String hdfsFilePath) throws IOException {
        //预处理,删除文件加
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(hdfsFilePath);
        //用文件系统操作路径
        try (FSDataInputStream inStream = fileSystem.open(path)) {
            IOUtils.copy(inStream, System.out, 4096);
        }
    }

    public static void main(String[] args) throws Exception {
//        String hdfsFilePath = "hdfs://node1:8020/bigdata/spark/wordCount/datas/wikiOfSpark.txt";
//        readHdfsFile(hdfsFilePath);

        String sourceRootPath = "/Users/mustafa/Documents/databases/bigdata/spark/spark_sql";
        String targetRootPath = "hdfs://node1:8020/bigdata/spark/carsBet/datas";
        String[] subFolders = new String[]{"apply", "lucky"};
        for (String subFolder : subFolders) {
            String sourcePath = sourceRootPath + "/" + subFolder;
            String targetPath = targetRootPath + "/" + subFolder;
            File sourcePathFile = new File(sourcePath);
            File[] sourceList = sourcePathFile.listFiles();
            for (File source : sourceList) {
                if (source.isDirectory() && source.getName().startsWith("batchNum=")) {
                    String target = targetPath + "/" + source.getName();
                    File[] sourcePartitionsList = source.listFiles();
                    Arrays.asList(sourcePartitionsList).parallelStream().forEach(sourcePartition -> {
                        if (sourcePartition.isFile() && sourcePartition.getName().startsWith("part-")) {
                            String targetPartition = target + "/" + sourcePartition.getName();
                            System.out.println("upload " + targetPartition);
                            try {
                                uploadHdfsFile(sourcePartition, targetPartition);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }

    }

}

hadoop hdfs 配置

core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
            <name>fs.defaultFS</name>
            <value>hdfs://node1:8020</value>
    </property>
    <property>
            <name>hadoop.tmp.dir</name>
            <value>/opt/software/hadoop/temp</value>
    </property>
	<property>
            <name>fs.trash.interval</name>
            <value>86400</value>
    </property>
    <property>
            <name>io.file.buffer.size</name>
            <value>131072</value>
    </property>
</configuration>

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
	<property>
		<name>dfs.permissions</name>
		<value>false</value>
	</property>
    <property>
            <name>dfs.namenode.http-address</name>
            <value>myserver2:9870</value>
    </property>
	<property>
		<name>dfs.namenode.secondary.http-address</name>
		<value>myserver2:9868</value>
	</property>
	<property>
		<name>dfs.replication</name>
		<value>2</value>
	</property>
</configuration>

hadoop-env.sh

export LANG=en_US.UTF-8
export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)}
export JAVA_HOME=/opt/software/jdk
export HADOOP_HOME=/opt/software/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_LOG_DIR=$HADOOP_HOME/logs
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

spark 配置

spark-defaults.conf

spark.master                     spark://192.168.31.125:7077
spark.local.dir			 /opt/software/spark/temp
spark.sql.autoBroadcastJoinThreshold	1g
spark.sql.adaptive.enabled	 true
spark.ui.port			 4040
spark.executor.cores		 3
spark.executor.memory		 2g
spark.memory.fraction		 0.9
spark.memory.storageFraction	 0.1
spark.eventLog.enabled		 true
spark.eventLog.dir		 hdfs://node1:8020/software/spark/history
spark.driver.memory              1g

spark-env.sh

#!/usr/bin/env bash

SPARK_MASTER_HOST=node0
HADOOP_CONF_DIR=/opt/software/hadoop/etc/hadoop
SPARK_EXECUTOR_CORES=3
SPARK_EXECUTOR_MEMORY=2g
JAVA_HOME=/opt/software/jdk
SPARK_HOME=/opt/software/spark

到了这里,关于spark 经典demo 的 scala 和 java 实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【spark】java类在spark中的传递,scala object在spark中的传递

    记录一个比较典型的问题,先讲一下背景,有这么一个用java写的类 然后在spark中使用的时候: 原因: scala的object对应的就是java的静态成员,可以反过来理解java的所有静态成员可被抽取成伴生对象(虽然现实中是scala最终编译成java)。以上面的JavaClass0 例子可理解为等价的

    2024年02月11日
    浏览(26)
  • Spark Scala大数据编程实例

    Scala是一门现代的多范式编程语言,平滑地集成了面向对象和函数式语言的特性,旨在以简练、优雅的方式来表达常用编程模式。Scala的设计吸收借鉴了许多种编程语言的思想,只有很少量特点是Scala自己独有的。Scala语言的名称来自于“可伸展的语言”,从写个小脚本到建立

    2024年02月04日
    浏览(39)
  • 如何在Spark Scala/Java应用中调用Python脚本

    本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同 1.PythonRunner 对于运行与 JVM 上的程序(即Scala、Java程序),Spark 提供了 PythonRunner 类。只需要调用PythonRunner 的main方法,就可以在Scala或Java程序中调用Python脚本。在实现上,PythonRunner 基于

    2023年04月24日
    浏览(44)
  • Spark 读写 es 数据(scala 版)

    读取 hdfs 文件 解析采用 fast-json : 1、 pom.xml 2、 main 文件 运行结果: 1、 pom.xml 2、 main 文件 参考文章 Spark读写ES数据时遇到的问题总结 Spark读写ES 使用Apache Spark将数据写入ElasticSearch

    2024年02月11日
    浏览(31)
  • spring boot java项目整合Scala&Spark,接口api调用方式调用scala代码,配置分享

    版本说明: spring boot: 2.5.9 jdk:1.8 spark:2.4.5 sclala:2.11.12 首先你需要有一个完美的spring boot项目(java版本)能成功运行,这就不赘述了,按照网上的自己搭建吧,然后重要的来了,我捣鼓了两天时间,各样的报错见过了,网上的处理方法要嘛是不全,要嘛是没有用,各种办

    2024年02月10日
    浏览(42)
  • 用idea工具scala 和 Java开发 spark案例:WordCount

    目录 一 环境准备 二 scala代码编写 三 java 代码编写         创建一个 maven 工程         添加下列依赖         原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改                  首先准备好数据,即

    2024年02月07日
    浏览(38)
  • 简单使用Spark、Scala完成对天气数据的指标统计

    目录 一、前言   什么是Spark?   什么是Scala 二、数据准备(数据类型的转换) 三、Spark部分 1、使用Spark完成数据中的“风级”,“风向”、“天气情况”相关指标统计及筛选 四、Scala部分 1、使用Scala统计某月、全年的温差、平均气温以及最值等相关的指标 五、遇到的问题

    2024年02月03日
    浏览(37)
  • Spark集群搭建记录 | 云计算[CentOS7] | Scala Maven项目访问Spark(local模式)实现单词计数

    本系列文章索引以及一些默认好的条件在 传送门 要想完成Spark的配置,首先需要完成HadoopSpark的配置 Hadoop配置教程:链接 若未进行明确说明,均按照root用户操作 本来在Eclipse的Marketplace便可以下载,可是现在官网都寄了 所以说只好找到之前的版本凑合来用 下载链接 这个软件

    2024年02月05日
    浏览(44)
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-3)

    对于每一门编程语言来说,数组(Array)都是重要的数据结构之一,主要用来存储数据类型相同的元素。Scala中的数组分为定长数组和变长数组,定义定长数组,需要使用new,而定义变长数组时,则需要导包 import scala.collection.mutable.ArrayBuffer 。 数组(Array)主要用来存储

    2024年02月10日
    浏览(44)
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-1)

    Spark是专为大规模数据处理而设计的快速通用的计算引擎,它是由Scala语言开发实现的,关于大数据技术,本身就是计算数据,而Scala既有面向对象组织项目工程的能力,又具备计算数据的功能,同时Spark和Scala的紧密集成,本书将采用Scala语言开发Spark程序,所以学好Scala将有助

    2024年02月11日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包