目录
一 环境准备
二 scala代码编写
三 java 代码编写
一 环境准备
创建一个 maven 工程
添加下列依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改
二 scala代码编写
首先准备好数据,即一个 txt 文本里面加一些单词,可以放在 hdfs 或本地或其它地方,读取的时候注意改代码,这里是读取 hdfs 上的 txt 文本,注意改成自己的地址
新建一个 scala 的 object,编写代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object WordCountDemo {
def main(args: Array[String]): Unit = {
val conf : SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
val sc : SparkContext = SparkContext.getOrCreate(conf)
var spark : SparkSession = SparkSession.builder().config(conf).getOrCreate()
// val rdd1: RDD[String] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt")
// val rdd2: RDD[String] = rdd1.flatMap(x => x.split(" "))
// val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
// val result: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)
val result2: RDD[(String, Int)] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
//打印到 console
// result2.glom().collect.foreach(x=>println(x.toList))
//保存到 hdfs
result2.saveAsTextFile("hdfs://101.200.63.3:9000/kb23/sparkoutput/wordcount")
}
}
这里稍微解释一下代码中的一些函数:
map:转换函数,数据集合中每个元素进行一次我们定义的方法
flatMap: 与map类似,但是映射为0个或多个
collect:以数组的形式返回数据集中的所有元素
glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。
云服务器的朋友可能有的报错
22/05/0305:48:53 WARN DFSClient: Failed to connect to /10.0.24.10:9866 for block, add to deadNodes and continue. org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
出现这种错误看字面意思就很容易明白,这是本地与 datanode 通信时,namenode 给的是 datanode 的内网 ip,所以本地找不到
解决方法也很简单,设置一下让 namenode 传过来的是服务器名而不是 ip
在 idea 中,resource 文件夹中添加文件 hdfs-site.xml
hdfs-site.xml内容:文章来源:https://www.toymoban.com/news/detail-724094.html
<!-- datanode 通信是否使用域名,默认为false,改为true -->
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
<description>Whether datanodes should use datanode hostnames whenconnecting to other datanodes for data transfer.
</description>
</property>
三 java 代码编写
这里原数据存储在本地,文件名为 input.txt文章来源地址https://www.toymoban.com/news/detail-724094.html
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Map;
public class WordCount {
public static void main(String[] args) {
// 创建SparkConf对象
SparkConf conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local");
// 创建JavaSparkContext对象
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取文本文件
JavaRDD<String> lines = sc.textFile("input.txt");
// 计算单词出现次数
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaRDD<String> filteredWords = words.filter(word -> !word.isEmpty());
JavaPairRDD<String, Integer> wordCounts = filteredWords.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((x, y) -> x + y);
Map<String, Integer> wordCountsMap = wordCounts.collectAsMap();
// 输出结果
for (Map.Entry<String, Integer> entry : wordCountsMap.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
// 关闭JavaSparkContext对象
sc.close();
}
}
到了这里,关于用idea工具scala 和 Java开发 spark案例:WordCount的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!