用idea工具scala 和 Java开发 spark案例:WordCount

这篇具有很好参考价值的文章主要介绍了用idea工具scala 和 Java开发 spark案例:WordCount。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一 环境准备

二 scala代码编写

三 java 代码编写


一 环境准备

        创建一个 maven 工程

        添加下列依赖

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-graphx_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>${mysql.version}</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.62</version>
    </dependency>

        原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改

        

二 scala代码编写

        首先准备好数据,即一个 txt 文本里面加一些单词,可以放在 hdfs 或本地或其它地方,读取的时候注意改代码,这里是读取 hdfs 上的 txt 文本,注意改成自己的地址

         新建一个 scala 的 object,编写代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object WordCountDemo {
  def main(args: Array[String]): Unit = {
    val conf : SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
    val sc : SparkContext = SparkContext.getOrCreate(conf)

    var spark : SparkSession = SparkSession.builder().config(conf).getOrCreate()

//    val rdd1: RDD[String] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt")
//    val rdd2: RDD[String] = rdd1.flatMap(x => x.split(" "))
//    val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
//    val result: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)

    val result2: RDD[(String, Int)] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    //打印到 console
    //    result2.glom().collect.foreach(x=>println(x.toList))
    //保存到 hdfs
    result2.saveAsTextFile("hdfs://101.200.63.3:9000/kb23/sparkoutput/wordcount")
  }

}

        这里稍微解释一下代码中的一些函数:

        map:转换函数,数据集合中每个元素进行一次我们定义的方法

        flatMap: 与map类似,但是映射为0个或多个

        collect:以数组的形式返回数据集中的所有元素 

        glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。

 

        云服务器的朋友可能有的报错

22/05/0305:48:53 WARN DFSClient: Failed to connect to /10.0.24.10:9866 for block, add to deadNodes and continue. org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]

        出现这种错误看字面意思就很容易明白,这是本地与 datanode 通信时,namenode 给的是 datanode 的内网 ip,所以本地找不到

        解决方法也很简单,设置一下让 namenode 传过来的是服务器名而不是 ip

        在 idea 中,resource 文件夹中添加文件 hdfs-site.xml

        hdfs-site.xml内容:

<!-- datanode 通信是否使用域名,默认为false,改为true -->
    <property>
        <name>dfs.client.use.datanode.hostname</name>
        <value>true</value>
        <description>Whether datanodes should use datanode hostnames whenconnecting to other datanodes for data transfer.
        </description>
    </property>

三 java 代码编写

        这里原数据存储在本地,文件名为 input.txt文章来源地址https://www.toymoban.com/news/detail-724094.html

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Map;

public class WordCount {
    public static void main(String[] args) {
        // 创建SparkConf对象
        SparkConf conf = new SparkConf()
                .setAppName("WordCount")
                .setMaster("local");

        // 创建JavaSparkContext对象
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 读取文本文件
        JavaRDD<String> lines = sc.textFile("input.txt");

        // 计算单词出现次数
        JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaRDD<String> filteredWords = words.filter(word -> !word.isEmpty());
        JavaPairRDD<String, Integer> wordCounts = filteredWords.mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey((x, y) -> x + y);
        Map<String, Integer> wordCountsMap = wordCounts.collectAsMap();

        // 输出结果
        for (Map.Entry<String, Integer> entry : wordCountsMap.entrySet()) {
            System.out.println(entry.getKey() + ": " + entry.getValue());
        }

        // 关闭JavaSparkContext对象
        sc.close();

    }
}

到了这里,关于用idea工具scala 和 Java开发 spark案例:WordCount的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java项目使用intellij-IDEA查看依赖包版本是否有冲突(方法及工具)附截图

    编译器及版本 idea-ultimate 依赖管理工具 maven Step1:点击右侧的maven Step2:右键依赖项,点击分析依赖关系 Step3:可以在模块名位置进行切换,左侧三角的标志则表示该包引入了多个版本,有冲突 Step4:可以看到当前这个包被引入了两个版本的 Step5:右键冲突的包名,可以看到

    2024年02月15日
    浏览(74)
  • 2023_Spark_实验三:基于IDEA开发Scala例子

    一、创建一个空项目,作为整个项目的基本框架 二、创建SparkStudy模块,用于学习基本的Spark基础 三、创建项目结构 1、在SparkStudy模块下的pom.xml文件中加入对应的依赖,并等待依赖包下载完毕。 在pom.xml文件中加入对应的依赖 等待依赖包下载完毕 2、若不能自动下载依赖包,

    2024年02月10日
    浏览(29)
  • 2023_Spark_实验六:Scala面向对象部分演示(二)(IDEA开发)

    7、Scala中的apply方法() 遇到如下形式的表达式时,apply方法就会被调用: Object(参数1,参数2,......,参数N) 通常,这样一个apply方法返回的是伴生类的对象;其作用是为了省略new Object的apply方法举例: 8、Scala中的继承 Scala和Java一样,使用extends扩展类。 案例一:

    2024年02月10日
    浏览(41)
  • 2023_Spark_实验五:Scala面向对象部分演示(一)(IDEA开发)

    1、面向对象的基本概念 把数据及对数据的操作方法放在一起,作为一个相互依存的整体——对象,面向 对象的三大特征:  封装  继承  多态 2、类的定义 简单类和无参方法 如果要开发main方法,需要将main方法定义在该类的伴生对象中,即:object对 象中,(后续做详细的讨

    2024年02月10日
    浏览(33)
  • 【用IDEA基于Scala2.12.18开发Spark 3.4.1 项目】

    打开IDEA后选址新建项目 选址sbt选项 配置JDK debug 解决方案 相关的依赖下载出问题多的话,可以关闭idea,重启再等等即可。 将sbt源设置为国内源 基于sbt添加依赖 spark-sql spark-core Spark sql simleapp代码如下: Spark RDD代码如下: 到此,基于Scala2.12.18开发Spark 3.4.1 项目完成。

    2024年02月14日
    浏览(26)
  • Spark-用IDEA编写wordcount demo

    Spark版本:3.2.0 Scala版本:2.12.12 JDK:1.8 Maven:3.6.3

    2024年02月15日
    浏览(29)
  • 4 | Java Spark实现 WordCount

    简单的 Java Spark 实现 WordCount 的教程,它将教您如何使用 Apache Spark 来统计文本文件中每个单词的出现次数。 首先,确保您已经安装了 Apache Spark 并设置了运行环境。您需要准备一个包含文本内容的文本文件,以便对其进行 WordCount 分析。

    2024年02月10日
    浏览(32)
  • 【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

    把DStream写入到MySQL数据库中 Spark 3.4.1 MySQL 8.0.30 sbt 1.9.2 需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL 提示:本项目通过sbt控制依赖 在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库

    2024年02月14日
    浏览(34)
  • 5 | Java Spark WordCount打成Jar 包测试

    步骤 1:准备 WordCount 代码 首先,确保 编写了 WordCount 代码,已经提供了正确的输入文件路径。

    2024年02月10日
    浏览(24)
  • Scala第二十章节(Akka并发编程框架、Akka入门案例、Akka定时任务代码实现、两个进程间通信的案例以及简易版spark通信框架案例)

    章节目标 理解Akka并发编程框架简介 掌握Akka入门案例 掌握Akka定时任务代码实现 掌握两个进程间通信的案例 掌握简易版spark通信框架案例 1. Akka并发编程框架简介 1.1 Akka概述 Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用工具包。Akka是使用scala开发的库,

    2024年04月11日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包