Spark—通过Java、Scala API实现WordCount案例的基本操作

这篇具有很好参考价值的文章主要介绍了Spark—通过Java、Scala API实现WordCount案例的基本操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

实验原理

Spark的核心就是RDD,所有在RDD上的操作会被运行在Cluster上,Driver程序启动很多Workers,Workers在(分布式)文件系统中读取数据后转化为RDD(弹性分布式数据集),然后对RDD在内存中进行缓存和计算。
Spark—通过Java、Scala API实现WordCount案例的基本操作,spark,java,scala
而RDD有两种类型的操作 ,分别是Action(返回values)和Transformations(返回一个新的RDD)。
Spark—通过Java、Scala API实现WordCount案例的基本操作,spark,java,scala

一、数据展示与前置准备

某电商网站记录了大量用户对商品的收藏数据,并将数据存储在名为buyer_favorite1的文件中,数据格式以及数据内容如下
Spark—通过Java、Scala API实现WordCount案例的基本操作,spark,java,scala
在进行后续操作前,请先开启hadoop和spark服务。可以通过jps命令查看进程是否开启完整。

二、创建scala工程项目

1、开发环境:eclipse

打开已安装完Scala插件的Eclipse,新建一个Scala项目,命名为spark4。
在spark4项目下新建包名,命名为my.scala。将scala object命名为ScalaWordCount。
Spark—通过Java、Scala API实现WordCount案例的基本操作,spark,java,scala

2、导入运行所需要的jar包。

右键项目,创建一个文件夹,名为lib。
将jar包导入进来,再右键jar包,点击Build Path=>Add to Build Path。(可以去我的资源里面下载spark1.x hadoop2.x)

3、编写Scala语句,并统计用户收藏数据中,每个用户收藏商品数量。
package my.scala  
import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
object ScalaWordCount {  
    def main(args: Array[String]) {  
	    //创建Spark的配置对象sparkConf,设置Spark程序运行时的配置信息; 
        val conf = new SparkConf()  
        conf.setMaster("local")  .setAppName("scalawordcount")
		//创建SparkContext对象,SparkContext是Spark程序所有功能的唯一入口,无论采用Scala、Java还是Python都必须有一个SparkContext;  
        val sc = new SparkContext(conf)   
        val rdd = sc.textFile("hdfs://localhost:9000/myspark/buyer_favorite1")   //根据具体的数据来源,通过SparkContext来创建RDD;
        //对初始的RDD进行Transformation级别的处理。(首先将每一行的字符串拆分成单个的单词,然后在单词拆分的基础上对每个单词实例计数为1;
        //最后,在每个单词实例计数为1的基础上统计每个单词在文件出现的总次数)。
        rdd.map(line => (line.split("\t")(0), 1))  
           .reduceByKey(_ + _)  
           .collect()  
           .foreach(println)  
        sc.stop()  
    }  
}  

在控制界面console中查看的输出结果。Spark—通过Java、Scala API实现WordCount案例的基本操作,spark,java,scala

三、创建Java工程项目

再次右键点击项目名,新建package,将包命名为my.java 。
右键点击包my.java,新建Class,命名为JavaWordCount。

1、编写Java代码,统计用户收藏数据中,每个用户收藏商品数量。
package my.java;  
import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.api.java.function.FlatMapFunction;  
import org.apache.spark.api.java.function.Function2;  
import org.apache.spark.api.java.function.PairFunction;  
import scala.Tuple2;  
import java.util.Arrays;  
import java.util.List;  
import java.util.regex.Pattern;  
public final class JavaWordCount {  
 private static final Pattern SPACE = Pattern.compile("\t");  
 public static void main(String[] args) throws Exception {  
   SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JavaWordCount");  
   JavaSparkContext ctx = new JavaSparkContext(sparkConf);  
   JavaRDD<String> lines = ctx.textFile("hdfs://localhost:9000/myspark/buyer_favorite1");  
   JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {  
        @Override  
        public Iterable<String> call(String s) {  
            String word[]=s.split("\t",2);  
            return Arrays.asList(word[0]);  
        }  
    });  
   JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {  
        @Override  
        public Tuple2<String, Integer> call(String s) {  
            return new Tuple2<String, Integer>(s, 1);  
        }  
   });  
   JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {  
        @Override  
        public Integer call(Integer i1, Integer i2) {  
            return i1 + i2;  
            }  
        });  
   List<Tuple2<String, Integer>> output = counts.collect();  
   System.out.println(counts.collect());  
   counts.saveAsTextFile("hdfs://localhost:9000/myspark/out");  
   ctx.stop();  
  }  
}
2、在linux终端查看输出结果

执行如下命令查看结果,前提是已启动集群

hadoop fs -cat /myspark/out/part-00000  
写在最后

由此可以看出,scala语言在编写spark程序时的优越性,简短精炼。文章来源地址https://www.toymoban.com/news/detail-553286.html

到了这里,关于Spark—通过Java、Scala API实现WordCount案例的基本操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • spring boot java项目整合Scala&Spark,接口api调用方式调用scala代码,配置分享

    版本说明: spring boot: 2.5.9 jdk:1.8 spark:2.4.5 sclala:2.11.12 首先你需要有一个完美的spring boot项目(java版本)能成功运行,这就不赘述了,按照网上的自己搭建吧,然后重要的来了,我捣鼓了两天时间,各样的报错见过了,网上的处理方法要嘛是不全,要嘛是没有用,各种办

    2024年02月10日
    浏览(42)
  • spark 经典demo 的 scala 和 java 实现

    💐💐扫码关注公众号,回复 spark 下载geekbang 原价 90 元 零基础入门 Spark 学习资料💐💐 要先对文件中的单词做统计计数,然后再打印出频次最高的 5 个单词,江湖人称“Word Count”wikiOfSpark.txt 文件下载地址:这里 scala 实现 java实现  为了限制机动车保有量,从 2011 年

    2024年04月27日
    浏览(20)
  • Scala第二十章节(Akka并发编程框架、Akka入门案例、Akka定时任务代码实现、两个进程间通信的案例以及简易版spark通信框架案例)

    章节目标 理解Akka并发编程框架简介 掌握Akka入门案例 掌握Akka定时任务代码实现 掌握两个进程间通信的案例 掌握简易版spark通信框架案例 1. Akka并发编程框架简介 1.1 Akka概述 Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用工具包。Akka是使用scala开发的库,

    2024年04月11日
    浏览(33)
  • 5 | Java Spark WordCount打成Jar 包测试

    步骤 1:准备 WordCount 代码 首先,确保 编写了 WordCount 代码,已经提供了正确的输入文件路径。

    2024年02月10日
    浏览(24)
  • YARN On Mapreduce搭建与wordCount案例实现

    YARN的基本思想是将资源管理RM,和作业调度、监控功能拆分成单独的守护进程。这个思想中拥有一个全局的资源管理器以及每个应用的MASTER,AM。每一个应用 都是单个作业或者一个DAG作业。 架构图: mapred-site.xml yarn-site.xml 配置节点分发到其他节点。 启动yarn 启动rm资源管理 访

    2023年04月24日
    浏览(28)
  • IDEA上面书写wordcount的Scala文件具体操作

     IDEA创建项目的操作步骤以及在虚拟机里面创建Scala的项目简单介绍_intellij 创建scala 目录 系列文章目录 1、编写Scala程序 2、更换pom.xml文件 3、更新Maven的依赖文件 4、执行代码即可 总结 本文主要在上述文章的基础上编辑和创建一个WordCount应用程序 创建wordcount项目 选择任意位

    2024年02月11日
    浏览(30)
  • Spark项目Java和Scala混合打包编译

    实际开发用有时候引用自己写的一些java工具类,但是整个项目是scala开发的spark程序,在项目打包时需要考虑到java和scala混合在一起编译。 今天看到之前很久之前写的一些打包编译文章,发现很多地方不太对,于是重新整理更新如下。 我们的项目结构可能如下图,既包含ja

    2024年02月11日
    浏览(27)
  • 【spark】java类在spark中的传递,scala object在spark中的传递

    记录一个比较典型的问题,先讲一下背景,有这么一个用java写的类 然后在spark中使用的时候: 原因: scala的object对应的就是java的静态成员,可以反过来理解java的所有静态成员可被抽取成伴生对象(虽然现实中是scala最终编译成java)。以上面的JavaClass0 例子可理解为等价的

    2024年02月11日
    浏览(26)
  • 如何在Spark Scala/Java应用中调用Python脚本

    本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同 1.PythonRunner 对于运行与 JVM 上的程序(即Scala、Java程序),Spark 提供了 PythonRunner 类。只需要调用PythonRunner 的main方法,就可以在Scala或Java程序中调用Python脚本。在实现上,PythonRunner 基于

    2023年04月24日
    浏览(40)
  • 大数据Spark SparkSession的3种创建方式 Scala语言实现

    SparkSession是Apache Spark 2.0版本引入的一个编程接口,用于与Spark进行交互。它是Spark应用程序的入口点,提供了一种方便的方式来创建DataFrame、DataSet和SQLContext等数据结构,并且可以配置各种Spark应用程序的选项。SparkSession还管理了Spark应用程序的运行环境,包括Spark集群的连接,

    2023年04月20日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包