简单使用Spark、Scala完成对天气数据的指标统计

这篇具有很好参考价值的文章主要介绍了简单使用Spark、Scala完成对天气数据的指标统计。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、前言

&  什么是Spark?

&  什么是Scala

二、数据准备(数据类型的转换)

三、Spark部分

1、使用Spark完成数据中的“风级”,“风向”、“天气情况”相关指标统计及筛选

四、Scala部分

1、使用Scala统计某月、全年的温差、平均气温以及最值等相关的指标

五、遇到的问题:

六、总结


一、前言

&  什么是Spark?

Spark最初由美国加州伯克利大学(UCBerkeley)的AMP(Algorithms, Machines and People)实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。Spark在诞生之初属于研究性项目,其诸多核心理念均源自学术研究论文。2013年,Spark加入Apache孵化器项目后,开始获得迅猛的发展,如今已成为Apache软件基金会最重要的三大分布式计算系统开源项目之一(即Hadoop、Spark、Storm)

&  什么是Scala

Scala是一门多范式的编程语言,一种类似java的编程语言,设计初衷是实现可伸缩的语言 、并集成面向对象编程和函数式编程的各种特性。

---------------------------------------------------------------------------------------------------------------------------------

二、数据准备(数据类型的转换)

将天气数据进行转换,csv转json文件,相关代码如下:

(使用的python,相对简单)

代码如下:

import csv
import json
import chardet

csvFilePath = 'weather.csv'
jsonFilePath = 'weather.json'

# 检测文件编码
with open(csvFilePath, 'rb') as file:
    result = chardet.detect(file.read())
encoding = result['encoding']

# 读取 CSV 文件并处理非 UTF-8 字符
with open(csvFilePath, 'r', encoding=encoding, errors='replace') as csvFile:
    csvDict = csv.DictReader(csvFile)

    jsonData = json.dumps([row for row in csvDict], ensure_ascii=False)

# 将 JSON 数据写入文件
with open(jsonFilePath, 'w', encoding='utf-8') as jsonFile:
    jsonFile.write(jsonData)

转后的数据如下(仅展示部分数据):

简单使用spark、scala完成对天气数据的指标统计的数据,spark,scala,大数据

---------------------------------------------------------------------------------------------------------------------------------

建类:

import java.io.Serializable;

public class Weather implements Serializable {
    private String date;//日期
    private String week;//星期
    private String weather;//天气情况
    private String min_temperature;//最低温度
    private String max_temperature;//最高温度
    private String wind_direction;//风向
    private String wind_scale;//风力等级

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getWeek() {
        return week;
    }

    public void setWeek(String week) {
        this.week = week;
    }

    public String getWeather() {
        return weather;
    }

    public void setWeather(String weather) {
        this.weather = weather;
    }

    public String getMin_temperature() {
        return min_temperature;
    }

    public void setMin_temperature(String min_temperature) {
        this.min_temperature = min_temperature;
    }

    public String getMax_temperature() {
        return max_temperature;
    }

    public void setMax_temperature(String max_temperature) {
        this.max_temperature = max_temperature;
    }

    public String getWind_direction() {
        return wind_direction;
    }

    public void setWind_direction(String wind_direction) {
        this.wind_direction = wind_direction;
    }

    public String getWind_scale() {
        return wind_scale;
    }

    public void setWind_scale(String wind_scale) {
        this.wind_scale = wind_scale;
    }
}

---------------------------------------------------------------------------------------------------------------------------------

三、Spark部分

1、使用Spark完成数据中的“风级”,“风向”、“天气情况”相关指标统计及筛选

指标:风级、风向

代码如下:

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;

public class WeatherAnalysis {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("WeatherAnalysis")
                .master("local").getOrCreate();

        SparkConf conf = new SparkConf().setAppName("Weather2").setMaster("local");
        // 读取json数据
        Dataset<Row> weatherData = spark.read().json("D:\\weather.json");

        // 1. 统计出现次数最多的“风级“数量,降序,并输出控制台
        Dataset<Row> windScaleCount = weatherData.groupBy("wind_scale")
                .count().sort(desc("count"));
        windScaleCount.show();

        // 2. 统计出现次数最多的“风向“数量,降序,并输出控制台
        Dataset<Row> windDirectionCount = weatherData.groupBy("wind_direction")
                .count().sort(desc("count"));
        windDirectionCount.show();

        // 3. 筛选出风级等于2级且风向为“西北风” 的天气数据,并输出控制台
        weatherData.where(" wind_scale = '2级' and wind_direction = '西北风'").show();
    }
}

运行结果如下:

简单使用spark、scala完成对天气数据的指标统计的数据,spark,scala,大数据

简单使用spark、scala完成对天气数据的指标统计的数据,spark,scala,大数据

简单使用spark、scala完成对天气数据的指标统计的数据,spark,scala,大数据

指标:天气情况

代码如下:

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.JavaSparkContext;
import static org.apache.spark.sql.functions.*;


    public class Weather2 {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("Weather2").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SparkSession spark = SparkSession.builder().appName("Weather2").master("local").getOrCreate();

            // 读取json数据
            Dataset<Row> weatherDF = spark.read().json("D:\\weather.json");

            //4、统计一年的各种“天气“情况出现频数,并输出控制台
            Dataset<Row> windDirectionCount = weatherDF.groupBy("weather")
                    .count().sort(desc("count"));
            windDirectionCount.show();

            // 5. 统计一年的“天气“情况为”晴“的出现天数,并输出控制台。
            long sunnyDays = weatherDF.filter(col("weather").equalTo("晴")).count();
            System.out.println("一年的“天气“情况为”晴“的天数有: \n" + sunnyDays + "天");

        }
    }

运行结果如下:

简单使用spark、scala完成对天气数据的指标统计的数据,spark,scala,大数据简单使用spark、scala完成对天气数据的指标统计的数据,spark,scala,大数据

---------------------------------------------------------------------------------------------------------------------------------

四、Scala部分

1、使用Scala统计某月、全年的温差、平均气温以及最值等相关的指标

指标:温差、平均温、最值

代码如下:

kage scala_weather

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object Weather1 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("WeatherAnalysis")
      .master("local")
      .getOrCreate()

    // 读取json数据
    val weatherDF = spark.read.json("D:\\weather.json")
    weatherDF.show()

      // 6. 统计3月份每天的温差,以及平均温度
      val marchWeatherDF = weatherDF.filter(month(to_date(col("date"), "yyyy/MM/dd")) === 3)

      val marchWeatherDiffDF = marchWeatherDF
        .withColumn("min_temp", regexp_replace(col("min_temperature"), "℃", "").cast(DoubleType))
        .withColumn("max_temp", regexp_replace(col("max_temperature"), "℃", "").cast(DoubleType))
        .withColumn("temp_diff", col("max_temp") - col("min_temp"))
        .groupBy("date")
        .agg(round(avg("min_temp"), 2).alias("avg_min_temp"), round(avg("max_temp"), 2).alias("avg_max_temp"), round(avg("temp_diff"), 2).alias("avg_temp_diff"))
        .orderBy("date")

      println("3月份每天的温差以及平均温度:")
      marchWeatherDiffDF.show()

      // 7. 统计全年的温差
      val yearWeatherDiffDF = weatherDF
        .withColumn("min_temp", regexp_replace(col("min_temperature"), "℃", "").cast(DoubleType))
        .withColumn("max_temp", regexp_replace(col("max_temperature"), "℃", "").cast(DoubleType))
        .withColumn("temp_diff", col("max_temp") - col("min_temp"))
        .agg(round(avg("temp_diff"), 2).alias("avg_temp_diff"))

      println("全年的温差平均值:")
      yearWeatherDiffDF.show()

       //8. 统计1月份每天的最高气温
      val januaryWeatherDF = weatherDF.filter(month(to_date(col("date"), "yyyy/MM/dd")) === 1)
      val januaryMaxTempDF = januaryWeatherDF
      .withColumn("max_temp", regexp_replace(col("max_temperature"), "℃", "").cast(DoubleType))
      .select("date","max_temp")
      .orderBy("date","max_temp")
      .orderBy("date")

      println("1月份每天的最高气温:")
      januaryMaxTempDF.show()

      spark.stop()
      }
}

运行结果如下:

简单使用spark、scala完成对天气数据的指标统计的数据,spark,scala,大数据简单使用spark、scala完成对天气数据的指标统计的数据,spark,scala,大数据简单使用spark、scala完成对天气数据的指标统计的数据,spark,scala,大数据

---------------------------------------------------------------------------------------------------------------------------------

五、遇到的问题:

1、json文件转换成功,但是使用ider进行数据的读取时,返回显示无法解析的错误

(使用了3种方法进行文件的转换。。。以下展示2种文件格式对比)

解决

// 1  ider可以解析的json文件的样式:

[{"date": "2019/1/1", "week": " 星期二 ", "weather": "晴", "min_temperature": "1℃", "max_temperature": "6℃", "wind_direction": "西北风", "wind_scale": "2级"}

// 2  无法解析的json文件样式:(看着样式和上面的区别不大,但是我运行时提示无法解析。。。不太懂。。。可能空格问题?)

[
    {
        "weather": "晴",
        "date": "2019/1/1",
        "week": " 星期二 ",
        "min_temperature": "1℃",
        "max_temperature": "6℃",
        "wind_scale": "2级",
        "wind_direction": "西北风"
    },

2、尝试读取json文件时,返回文件路径不存在的问题(使用相对路径绝对路径均无用)

解决我把json文件移动到根目录下,就成功读取到了。。。

3、当创建Scala类时,找不到创建Scala类或者Scala项目的选项(Scala已经安装并已完全部署好、插件、包均已导入)

解决打开“文件”--->“项目结构”--->“平台设置”---->“全局库”,把Scala包再重新导入就可以啦

4、 编写Scala代码来统计气温的时候,使用 "$" 符号,提示 $不是StringContext的成员(插件已经安装,Scala包也已经导入)

解决: "$" 换成col,再将字段()起来就可以啦

---------------------------------------------------------------------------------------------------------------------------------

六、总结

        学习Spark和Scala编程可以帮助我们处理大规模数据,进行数据分析。使用Spark和Scala编写程序可以提高数据处理的效率和灵活性,同时还能够充分发挥分布式计算的优势。通过学习这两门技术,我们可以更好地理解数据处理的流程和原理,并且可以应用到实际的数据分析和统计工作中。总而言之,学习Spark和Scala编程是提升数据处理能力和数据分析技能的重要途径。文章来源地址https://www.toymoban.com/news/detail-770937.html

到了这里,关于简单使用Spark、Scala完成对天气数据的指标统计的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark 读写 es 数据(scala 版)

    读取 hdfs 文件 解析采用 fast-json : 1、 pom.xml 2、 main 文件 运行结果: 1、 pom.xml 2、 main 文件 参考文章 Spark读写ES数据时遇到的问题总结 Spark读写ES 使用Apache Spark将数据写入ElasticSearch

    2024年02月11日
    浏览(39)
  • spark底层为什么选择使用scala语言开发

    基于Scala的语言特性 集成性:Scala 是一种运行在 Java 虚拟机(JVM)上的静态类型编程语言,可以与 Java 代码无缝集成。由于 Spark 涉及到与大量 Java 生态系统的交互,例如 Hadoop、Hive 等,使用 Scala 可以方便地与这些组件进行集成和交互。 函数式编程支持:Scala 是一种面向函数

    2024年02月10日
    浏览(52)
  • 大数据Spark SparkSession的3种创建方式 Scala语言实现

    SparkSession是Apache Spark 2.0版本引入的一个编程接口,用于与Spark进行交互。它是Spark应用程序的入口点,提供了一种方便的方式来创建DataFrame、DataSet和SQLContext等数据结构,并且可以配置各种Spark应用程序的选项。SparkSession还管理了Spark应用程序的运行环境,包括Spark集群的连接,

    2023年04月20日
    浏览(35)
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-1)

    Spark是专为大规模数据处理而设计的快速通用的计算引擎,它是由Scala语言开发实现的,关于大数据技术,本身就是计算数据,而Scala既有面向对象组织项目工程的能力,又具备计算数据的功能,同时Spark和Scala的紧密集成,本书将采用Scala语言开发Spark程序,所以学好Scala将有助

    2024年02月11日
    浏览(61)
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-3)

    对于每一门编程语言来说,数组(Array)都是重要的数据结构之一,主要用来存储数据类型相同的元素。Scala中的数组分为定长数组和变长数组,定义定长数组,需要使用new,而定义变长数组时,则需要导包 import scala.collection.mutable.ArrayBuffer 。 数组(Array)主要用来存储

    2024年02月10日
    浏览(60)
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-2)

    Spark是专为大规模数据处理而设计的快速通用的计算引擎,它是由Scala语言开发实现的,关于大数据技术,本身就是计算数据,而Scala既有面向对象组织项目工程的能力,又具备计算数据的功能,同时Spark和Scala的紧密集成,本书将采用Scala语言开发Spark程序,所以学好Scala将有助

    2024年02月11日
    浏览(58)
  • 大数据开发之Hive(统计影音视频网站的常规指标)

    1、视频表 字段 备注 详细描述 videoId 视频唯一id(String) 11位字符串 uploader 视频上传者(String) 上传视频的用户名String age 视频年龄(int) 视频在平台上的整天数 category 视频类别(Array) 上传视频指定的视频分类 length 视频长度(Int) 整形数字标识的视频长度 views 观看次数(Int) 视频被浏

    2024年01月19日
    浏览(42)
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“t”键分割,数据内容及数据格式如下: 项目环境说明 开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。 1、新创一个文件folder命名为li

    2024年02月13日
    浏览(55)
  • Linux CentOS下大数据环境搭建(zookeeper+hadoop+hbase+spark+scala)

    本篇文章是结合我个人学习经历所写,如果遇到什么问题或者我有什么错误,欢迎讨论。 百度网盘链接:https://pan.baidu.com/s/1DCkQQVYqYHYtPws9hWGpgw?pwd=zh1y 提取码:zh1y 软件在连接中VMwareWorkstation_V16.2.1_XiTongZhiJia的文件夹下。 双击运行安装包,这里下一步即可。 这里勾选我接受许可

    2024年04月15日
    浏览(66)
  • 构建大数据环境:Hadoop、MySQL、Hive、Scala和Spark的安装与配置

    在当今的数据驱动时代,构建一个强大的大数据环境对于企业和组织来说至关重要。本文将介绍如何安装和配置Hadoop、MySQL、Hive、Scala和Spark,以搭建一个完整的大数据环境。 安装Hadoop 首先,从Apache Hadoop的官方网站下载所需的Hadoop发行版。选择适合你系统的二进制发行版,下

    2024年02月11日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包