💐💐扫码关注公众号,回复 spark 关键字下载geekbang 原价 90 元 零基础入门 Spark 学习资料💐💐
准备 maven 依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.1</version>
</dependency>
先上代码
WordCount(单词计数)
要先对文件中的单词做统计计数,然后再打印出频次最高的 5 个单词,江湖人称“Word Count”wikiOfSpark.txt 文件下载地址:这里
scala 实现
import org.apache.spark.rdd.RDD
// 这里的下划线"_"是占位符,代表数据文件的根目录
val rootPath: String = "/Users/mustafa/Documents/databases/bigdata/spark/spark_core"
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)
java实现
package com.mustafa.mynetty;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.jetbrains.annotations.NotNull;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class WordCount
{
public static void main( String[] args )
{
JavaSparkContext context = getJavaSparkContext();
String file = "hdfs://node1:8020/bigdata/spark/wordCount/datas/wikiOfSpark.txt";
JavaRDD<String> lineRDD = context.textFile(file, 3);
JavaRDD<String> wordRDD = lineRDD.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());
List<String> wordCountWords = Arrays.asList("Apache", "Spark");
Broadcast<List<String>> bc = context.broadcast(wordCountWords);
JavaRDD<String> cleanWordRDD = wordRDD.filter((Function<String, Boolean>) word -> bc.value().contains(word));
JavaPairRDD<String, Integer> kvRDD = cleanWordRDD.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word,1));
JavaPairRDD<String, Integer> wordCounts = kvRDD.reduceByKey((Function2<Integer, Integer, Integer>) (x, y) -> x + y);
JavaPairRDD<Integer, String> wordCountsRevert = wordCounts.mapToPair((PairFunction<Tuple2<String, Integer>, Integer, String>) kv -> new Tuple2<>(kv._2(), kv._1()));
JavaPairRDD<Integer, String> wordCountsSorted =wordCountsRevert.sortByKey(false);
String targetPath = "hdfs://node1:8020/bigdata/spark/wordCount/result";
wordCountsSorted.saveAsTextFile(targetPath);
// List<Tuple2<Integer, String>> result = wordCountsSorted.collect();
// result.forEach(row -> System.out.println(row._2() + ":" + row._1()));
}
@NotNull
private static JavaSparkContext getJavaSparkContext() {
SparkConf conf = new SparkConf();
conf.setMaster("spark://node0:7077");
conf.setAppName("WordCount");
conf.set("spark.executor.instances", "2");
conf.set("spark.cores.max", "6");
conf.set("spark.executor.cores", "3");
conf.set("spark.executor.memory", "2g");
conf.set("spark.memory.fraction", "0.9");
conf.set("spark.memory.storageFraction", "0.1");
JavaSparkContext context = new JavaSparkContext(conf);
return context;
}
}
小汽车摇号分析
为了限制机动车保有量,从 2011 年开始,北京市政府推出了小汽车摇号政策。随着摇号进程的推进,在 2016 年,为了照顾那些长时间没有摇中号码牌的“准司机”,摇号政策又推出了“倍率”制度。
所谓倍率制度,它指的是,结合参与摇号次数,为每个人赋予不同的倍率系数。有了倍率加持,大家的中签率就由原来整齐划一的基础概率,变为“基础概率 * 倍率系数”。参与摇号的次数越多,倍率系数越大,中签率也会相应得到提高。
不过,身边无数的“准司机”总是跟我说,其实倍率这玩意没什么用,背了 8 倍、10 倍的倍率,照样摇不上!那么今天这一讲,咱们就来借着学习 Spark SQL 的机会,用数据来为这些还没摸过车的“老司机”答疑解惑,帮他们定量地分析一下,倍率与中签率之间,到底有没有关系?
2011 年到 2019 年北京市小汽车的摇号数据,你可以通过这个地址,从网盘进行下载,提取码为 ajs6
scala 实现
//spark-shell --conf spark.executor.memory=4g --conf spark.driver.memory=4g
import org.apache.spark.sql.DataFrame
val rootPath: String = "/Users/mustafa/Documents/databases/bigdata/spark/spark_sql"
// 申请者数据
val hdfs_path_apply: String = s"${rootPath}/apply"
// spark是spark-shell中默认的SparkSession实例
// 通过read API读取源文件
val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)
// 数据打印
applyNumbersDF.show
// 中签者数据
val hdfs_path_lucky: String = s"${rootPath}/lucky"
// 通过read API读取源文件
val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)
// 数据打印
luckyDogsDF.show
// 过滤2016年以后的中签数据,且仅抽取中签号码carNum字段
val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")
// 摇号数据与中签数据做内关联,Join Key为中签号码carNum
val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")
// 以batchNum、carNum做分组,统计倍率系数
val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum")).agg(count(lit(1)).alias("multiplier"))
// 以carNum做分组,保留最大的倍率系数
val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum").agg(max("multiplier").alias("multiplier"))
// 以multiplier倍率做分组,统计人数
val result: DataFrame = uniqueMultipliers.groupBy("multiplier").agg(count(lit(1)).alias("cnt")).orderBy("multiplier")
result.collect
Java 实现
package com.mustafa.mynetty;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import scala.Enumeration;
import java.util.List;
public class CarsBet {
public static void main(String[] args) {
SparkSession session = SparkSession
.builder()
.appName("CarsBet")
.master("spark://node0:7077")
.config("spark.executor.instances", "2")
.config("spark.cores.max", "8")
.config("spark.executor.cores", "3")
// .config("spark.executor.cores", "8")
.config("spark.executor.memory", "2g")
// .config("spark.executor.memory", "4g")
.config("spark.memory.fraction", "0.9")
.config("spark.memory.storageFraction", "0.2")
.getOrCreate();
String rootPath = "hdfs://node1:8020/bigdata/spark/carsBet/datas";
String hdfs_path_apply = rootPath + "/apply";
Dataset<Row> applyNumbersDF = session.read().parquet(hdfs_path_apply);
String hdfs_path_lucky = rootPath + "/lucky";
Dataset<Row> luckyDogsDF = session.read().parquet(hdfs_path_lucky);
Dataset<Row> filteredLuckyDogs = luckyDogsDF.filter("batchNum >= 201601").select("carNum");
Dataset<Row> filteredLuckyDogsCache = functions.broadcast(filteredLuckyDogs);
Dataset<Row> jointDF = applyNumbersDF.join(filteredLuckyDogsCache, applyNumbersDF.col("carNum").equalTo(filteredLuckyDogsCache.col("carNum")), "inner").drop(filteredLuckyDogsCache.col("carNum"));
Dataset<Row> jointDFCache = functions.broadcast(jointDF);
Dataset<Row> multipliers = jointDFCache.groupBy("carNum", "batchNum").agg(functions.count(functions.lit(1)).alias("multiplier"));
Dataset<Row> multipliersCache = functions.broadcast(multipliers);
Dataset<Row> uniqueMultipliers = multipliersCache.groupBy("carNum").agg(functions.max("multiplier").alias("multiplier"));
Dataset<Row> uniqueMultipliersCache = functions.broadcast(uniqueMultipliers);
Dataset<Row> result = uniqueMultipliersCache.groupBy("multiplier").agg(functions.count(functions.lit(1)).alias("cnt")).orderBy("multiplier");
result.write().format("csv").option("header", true).mode("overwrite").save("hdfs://node1:8020/bigdata/spark/carsBet/result");
// result.cache();
// result.count();
//
// result.show();
//
// List<Row> list = result.collectAsList();
// list.forEach(row -> System.out.println(row.getLong(0) + ":" + row.getLong(1)));
}
}
流动的 wordCount
在之前的 Word Count 中,数据以文件(wikiOfSpark.txt)的形式,一次性地“喂给”Spark,从而触发一次 Job 计算。而在“流动的 Word Count”里,数据以行为粒度,分批地“喂给”Spark,每一行数据,都会触发一次 Job 计算。
具体来说,使用 netcat 工具,向本地 9999 端口的 Socket 地址发送数据行。而 Spark 流处理应用,则时刻监听着本机的 9999 端口,一旦接收到数据条目,就会立即触发计算逻辑的执行。这里的计算逻辑,就是 Word Count。计算执行完毕之后,流处理应用再把结果打印到终端(Console)上。文章来源:https://www.toymoban.com/news/detail-859620.html
打开一个终端,在 9999 端口喂数据。linux nc -lk 9999 ,mac nc -l -p 9999文章来源地址https://www.toymoban.com/news/detail-859620.html
scala 实现
import org.apache.spark.sql.DataFrame
// 设置需要监听的本机地址与端口号
val host: String = "127.0.0.1"
val port: String = "9999"
// 从监听地址创建DataFrame
var df: DataFrame = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
// 首先把接收到的字符串,以空格为分隔符做拆分,得到单词数组words
df = df.withColumn("words", split($"value", " "))
// 把数组words展平为单词word
.withColumn("word", explode($"words"))
// 以单词word为Key做分组
.groupBy("word")
// 分组计数
.count()
df.writeStream
// 指定Sink为终端(Console)
.format("console")
// 指定输出选项
.option("truncate", false)
// 指定输出模式
.outputMode("complete")
//.outputMode("update")
// 启动流处理应用
.start()
// 等待中断指令
.awaitTermination()
java实现
package com.mustafa.mynetty;
import org.apache.spark.sql.*;
import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import java.util.concurrent.TimeoutException;
public class StreamingWordCount {
public static void main(String[] args) {
SparkSession session = SparkSession
.builder()
.appName("CarsBet")
.master("local[*]")
.config("spark.executor.instances", "2")
.config("spark.cores.max", "6")
.config("spark.executor.cores", "3")
.config("spark.executor.memory", "2g")
.config("spark.memory.fraction", "0.9")
.config("spark.memory.storageFraction", "0.1")
.getOrCreate();
String host = "127.0.0.1";
String port = "9999";
Dataset<Row> df = session.readStream().format("socket").option("host", host).option("port", port).load();
df = df.withColumn("inputs", functions.split(df.col("value"), ","))
.withColumn("eventTime", functions.element_at(new Column("inputs"), 1).cast("timestamp"))
.withColumn("words", functions.split(functions.element_at(new Column("inputs"), 2), " "))
.withColumn("word", functions.explode(new Column("words")))
.withWatermark("eventTime", "10 minute")
.groupBy(functions.window(new Column("eventTime"), "5 minute"), new Column("word"))
.count();
Trigger trigger = new ProcessingTimeTrigger(5000);
try {
df.writeStream()
.trigger(trigger)
.option("checkpointLocation", "hdfs://node1:8020/software/spark/check/" + Long.toString(System.currentTimeMillis()))
.format("console")
.option("truncate", "false")
.outputMode("update")
.start()
.awaitTermination();
} catch (StreamingQueryException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
集群运行
spark-submit \
--master spark://node0:7077 \
--deploy-mode cluster \
--class com.mustafa.mynetty.CarsBet \
hdfs://node1:8020/software/spark-demo-1.0-SNAPSHOT.jar
附录
文件上传到 hdfs 文件系统(小汽车摇号分析)
package com.mustafa.mynetty;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
public class CarsBetUploader {
private static FileSystem getFileSystem() throws IOException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem;
}
private static void uploadHdfsFile(File localFile, String targetPath) throws IOException {
FileSystem fileSystem = getFileSystem();
fileSystem.delete(new Path(targetPath), true);
try (FileInputStream inStream = new FileInputStream(localFile); FSDataOutputStream outStream = fileSystem.create(new Path(targetPath))) {
IOUtils.copy(inStream, outStream, 4096);
}
}
private static void readHdfsFile(String hdfsFilePath) throws IOException {
//预处理,删除文件加
FileSystem fileSystem = getFileSystem();
Path path = new Path(hdfsFilePath);
//用文件系统操作路径
try (FSDataInputStream inStream = fileSystem.open(path)) {
IOUtils.copy(inStream, System.out, 4096);
}
}
public static void main(String[] args) throws Exception {
// String hdfsFilePath = "hdfs://node1:8020/bigdata/spark/wordCount/datas/wikiOfSpark.txt";
// readHdfsFile(hdfsFilePath);
String sourceRootPath = "/Users/mustafa/Documents/databases/bigdata/spark/spark_sql";
String targetRootPath = "hdfs://node1:8020/bigdata/spark/carsBet/datas";
String[] subFolders = new String[]{"apply", "lucky"};
for (String subFolder : subFolders) {
String sourcePath = sourceRootPath + "/" + subFolder;
String targetPath = targetRootPath + "/" + subFolder;
File sourcePathFile = new File(sourcePath);
File[] sourceList = sourcePathFile.listFiles();
for (File source : sourceList) {
if (source.isDirectory() && source.getName().startsWith("batchNum=")) {
String target = targetPath + "/" + source.getName();
File[] sourcePartitionsList = source.listFiles();
Arrays.asList(sourcePartitionsList).parallelStream().forEach(sourcePartition -> {
if (sourcePartition.isFile() && sourcePartition.getName().startsWith("part-")) {
String targetPartition = target + "/" + sourcePartition.getName();
System.out.println("upload " + targetPartition);
try {
uploadHdfsFile(sourcePartition, targetPartition);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
}
}
}
hadoop hdfs 配置
core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://node1:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/software/hadoop/temp</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>86400</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
</configuration>
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>myserver2:9870</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>myserver2:9868</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
</configuration>
hadoop-env.sh
export LANG=en_US.UTF-8
export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)}
export JAVA_HOME=/opt/software/jdk
export HADOOP_HOME=/opt/software/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_LOG_DIR=$HADOOP_HOME/logs
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
spark 配置
spark-defaults.conf
spark.master spark://192.168.31.125:7077
spark.local.dir /opt/software/spark/temp
spark.sql.autoBroadcastJoinThreshold 1g
spark.sql.adaptive.enabled true
spark.ui.port 4040
spark.executor.cores 3
spark.executor.memory 2g
spark.memory.fraction 0.9
spark.memory.storageFraction 0.1
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:8020/software/spark/history
spark.driver.memory 1g
spark-env.sh
#!/usr/bin/env bash
SPARK_MASTER_HOST=node0
HADOOP_CONF_DIR=/opt/software/hadoop/etc/hadoop
SPARK_EXECUTOR_CORES=3
SPARK_EXECUTOR_MEMORY=2g
JAVA_HOME=/opt/software/jdk
SPARK_HOME=/opt/software/spark
到了这里,关于spark 经典demo 的 scala 和 java 实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!