大数据开源框架之基于Spark的气象数据处理与分析

这篇具有很好参考价值的文章主要介绍了大数据开源框架之基于Spark的气象数据处理与分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark配置请看:

(30条消息) 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署_木子一个Lee的博客-CSDN博客

目录

实验说明:

实验要求:

实验步骤:

数据获取:

数据分析:

可视化:

参考代码(适用于python3):

运行结果:


实验说明:

        本次实验所采用的数据,从中央气象台官方网站(网址:http://www.nmc.cn/)爬取,主要是最近24小时各个城市的天气数据,包括时间整点、整点气温、整点降水量、风力、整点气压、相对湿度等。正常情况每个城市对应24条数据(每个整点一条)。数据规模达到2412个城市,57888条数据,有部分城市部分时间点数据存在缺失或异常。特别说明:实验所用数据均为网上爬取,没有得到中央气象台官方授权使用,使用范围仅限本次实验使用,请勿用于商业用途。 

实验要求:

1.数据获取,最后保存的各个城市最近24小时整点天气数据(passed_weather_ALL.csv)每条数据各字段含义如下所示,这里仅列出实验中使用部分:

字段 含义

字段 含义

province 城市所在省份(中文)

province 城市所在省份(中文)

city_index 城市序号(计数)

city_index 城市序号(计数)

city_name 城市名称(中文)

city_name 城市名称(中文)

city_code 城市编号

city_code 城市编号

time 时间点(整点)

time 时间点(整点)

temperature 气温

temperature 气温

rain1h 过去1小时降雨量;

rain1h 过去1小时降雨量;

2. 数据分析,主要使用Spark SQL相关知识与技术,对各个城市过去24小时累积降雨量和当日平均气温进行计算和排序;

3. 数据可视化,数据可视化使用python matplotlib库,版本号1.5.1。可使用pip命令安装。绘制过程大体如下:

第一步,应当设置字体,这里提供了黑体的字体文件simhei.tff。否则坐标轴等出现中文的地方是乱码。

第二步,设置数据(累积雨量或者日平均气温)和横轴坐标(城市名称),配置直方图。

第三步,配置横轴坐标位置,设置纵轴坐标范围

第四步,配置横纵坐标标签

第五步,配置每个条形图上方显示的数据

第六步,根据上述配置,画出直方图。。

根据上述实验任务,设计相应内容与具体执行步骤,并对相应关键步骤的执行结果给出截图。 

实验步骤:

数据获取:

思路:

首先利用urllib.request获取url的数据,然后利用json.loads变为json格式

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

再编写函数写入表头和数据:

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

利用上述函数组合,编写两个get函数获取城市和省份,导出CSV文件:

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

最后获取天气数据,导出passed_weather_ALL.csv

每个字段获取方式是:

city_code就是city.csv的code,province就是city.csv里边的province,city_name就是city.csv里边的city,city_index就是第几个城市(设置count变量计数,每个城市加1),

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

其他直接通过爬取表头获得:

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

在主函数里运行:

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

部分代码:

def get_passed_weather(self,province):
        weather_passed_file = 'input/passed_weather_' + province + '.csv'
        if os.path.exists(weather_passed_file):
            return
        passed_weather = list()
        count = 0
        if province == 'ALL':
            print ("开始爬取过去的天气状况")
            for city in self.get_cities():
                data = self.parse_json('http://www.nmc.cn/f/rest/passed/'+city['code'])
                if data:
                    count = count + 1
                    for item in data:
                        item['city_code'] = city['code']
                        item['province'] = city['province']
                        item['city_name'] = city['city']
                        item['city_index'] = str(count)
                    passed_weather.extend(data)
                if count % 50 == 0:
                    if count == 50:
                        self.write_header(weather_passed_file,passed_weather)
                    else:
                        self.write_row(weather_passed_file,passed_weather)
                    passed_weather = list()
            if passed_weather:
                if count <= 50:
                    self.write_header(weather_passed_file,passed_weather)
                else:
                    self.write_row(weather_passed_file,passed_weather)
            print ("爬取过去的天气状况完毕!")
        else:
            print ("开始爬取过去的天气状况")
            select_city = filter(lambda x:x['province']==province,self.get_cities())
            for city in select_city:
                data = self.parse_json('http://www.nmc.cn/f/rest/passed/'+city['code'])
                if data:
                    count = count + 1
                    for item in data:
                        item['city_index'] = str(count)
                        item['city_code'] = city['code']
                        item['province'] = city['province']
                        item['city_name'] = city['city']
                    passed_weather.extend(data)
            self.write_csv(weather_passed_file,passed_weather)
            print ("爬取过去的天气状况完毕!")
 
    def run(self,range = 'ALL'):
        self.get_passed_weather(range)

数据分析:

思路:

首先创建spark对象,然后使用select函数选择所需列的数据进行筛选,分组(累计降雨量按照省份、城市和城市代码分组,气温还得考虑时间date)求和、sort函数排序,

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

分析气温还需要进行筛选4个时刻,然后再进行分组求和排序

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

最后生成相应的csv或json文件,返回所需要的前20个或前10个数据。

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

部分代码:

def passed_rain_analyse(filename): #计算各个城市过去24小时累积雨量
    print ("开始分析累积降雨量")
    #spark = SparkSession.builder.master("spark://master:7077").appName("passed_rain_analyse").getOrCreate()
    #spark = SparkSession.builder.master("local[4]").appName("passed_rain_analyse").getOrCreate()
    spark = SparkSession.builder.master("local").appName("passed_rain_analyse").getOrCreate()
    
    df = spark.read.csv(filename,header = True)
    
    df_rain = df.select(df['province'],df['city_name'],df['city_code'],df['rain1h'].cast(DecimalType(scale=1)))        .filter(df['rain1h'] < 1000) #筛选数据,去除无效数据
    df_rain_sum = df_rain.groupBy("province","city_name","city_code")        .agg(F.sum("rain1h").alias("rain24h"))        .sort(F.desc("rain24h")) # 分组、求和、排序
    df_rain_sum.cache()
    df_rain_sum.coalesce(1).write.csv("file:///home/lee/lab5/passed_rain_analyse.csv")
    #spark.catalog.refreshTable(filename)
    print ("累积降雨量分析完毕!")
    return df_rain_sum.head(20)#前20个

def passed_temperature_analyse(filename):
    print ("开始分析气温")
    #spark = SparkSession.builder.master("spark://master:7077").appName("passed_temperature_analyse").getOrCreate()
    spark = SparkSession.builder.master("local").appName("passed_temperature_analyse").getOrCreate()
    #spark = SparkSession.builder.master("local[4]").appName("passed_rain_analyse").getOrCreate()
    df = spark.read.csv(filename,header = True)
    df_temperature = df.select( #选择需要的列
            df['province'],
            df['city_name'],
            df['city_code'],
            df['temperature'].cast(DecimalType(scale=1)),
            F.date_format(df['time'],"yyyy-MM-dd").alias("date"), #得到日期数据
            F.hour(df['time']).alias("hour") #得到小时数据
    )
    # 筛选四点时次
    #df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2,4,6,8]))
df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2,8,14,20]))
    #df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24]))
    df_avg_temperature = df_4point_temperature.groupBy("province","city_name","city_code","date")        .agg(F.count("temperature"),F.avg("temperature").alias("avg_temperature"))        .filter("count(temperature) = 4")        .sort(F.asc("avg_temperature"))        .select("province","city_name","city_code","date",F.format_number('avg_temperature',1).alias("avg_temperature"))
    df_avg_temperature.cache()
    avg_temperature_list = df_avg_temperature.collect()
    df_avg_temperature.coalesce(1).write.json("file:///home/lee/lab5/passed_temperature.json")
    print ("气温分析完毕")
    return avg_temperature_list[0:10]#最低的10个

可视化:

思路:

使用python matplotlib库进行绘图,

第一步,应当设置字体,这里提供了黑体的字体文件simhei.tff。否则坐标轴等出现中文的地方是乱码。

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

第二步,设置数据(累积雨量或者日平均气温)和横轴坐标(城市名称),配置直方图。

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

第三步,配置横轴坐标位置,设置纵轴坐标范围

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

第四步,配置横纵坐标标签

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

第五步,配置每个条形图上方显示的数据

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

第六步,根据上述配置,画出直方图。(见下方,按住CTRL点我去)

其他个性化代码:

直方图颜色

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

color=’ckrmgby’,一个七种颜色,分别对应青、黑、红、洋红、绿、蓝、黄

字体大小、颜色:

大小使用fontsize属性,颜色仍然是color属性

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

设置图的大小:使用figsize属性

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

部分代码:

def draw_rain(rain_list):
    print ("开始绘制累积降雨量图")
    font = FontProperties(fname='ttf/simhei.ttf') # 设置字体
    name_list = []
    num_list = []
    for item in rain_list:
        name_list.append(item.province[0:2] + '\n' + item.city_name)
        num_list.append(item.rain24h)
    index = [i+0.25 for i in range(0,len(num_list))]
    plt.figure(figsize=(15,12))#设置图的大小
    rects=plt.bar(index, num_list, color='ckrmgby',width = 0.5)
    plt.xticks([i+0.25 for i in index], name_list, fontproperties = font,fontsize=15,color='r')#fontsize设置x刻度字体大小
    plt.ylim(ymax=(int(max(num_list)+100)/100)*20, ymin=0)#设置刻度间隔
    plt.yticks(fontsize=20,color='r')#fontsize设置y刻度字体大小
    plt.xlabel("城市",fontproperties = font,fontsize=25,color='c')#fontsize设置x坐标标签字体大小
    plt.ylabel("雨量",fontproperties = font,fontsize=25,color='c')#fontsize设置y坐标标签字体大小
    plt.title("过去24小时累计降雨量全国前20名",fontproperties = font,fontsize=30,color='b')#fontsize设置标题字体大小
    for rect in rects:
        height = rect.get_height()
        #fontsize设置直方图上字体大小
        plt.text(rect.get_x() + rect.get_width() / 2, height, str(height), ha="center", va="bottom",fontsize=15)
    plt.show()
    print ("累积降雨量图绘制完毕!")

def draw_temperature(temperature_list):
    print ("开始绘制气温图")
    font = FontProperties(fname='/home/lee/lab5/ttf/simhei.ttf')
    name_list = []
    num_list = []
    #print(temperature_list[1])
    date = temperature_list[1].date
    for item in temperature_list:
        name_list.append(item.province[0:2] + '\n' + item.city_name)
        num_list.append(float(item.avg_temperature))
    index = [i+0.25 for i in range(0,len(num_list))]
    plt.figure(figsize=(15,12))#设置图的大小
    rects=plt.bar(index, num_list, color='ckrmgby',width = 0.5)
    plt.xticks([i+0.25 for i in index], name_list, fontproperties = font,fontsize=20,color='r')#fontsize设置x刻度字体大小
    plt.ylim(ymax = math.ceil(float(max(num_list)))*1.5, ymin = 0)#设置刻度间隔
    plt.yticks(fontsize=20,color='r')#fontsize设置y刻度字体大小
    plt.xlabel("城市",fontproperties = font,fontsize=25,color='c')#fontsize设置坐标标签字体大小
    plt.ylabel("日平均气温",fontproperties = font,fontsize=25,color='c')#fontsize设置坐标标签字体大小
    plt.title(date + "全国日平均气温最低前10名",fontproperties = font,fontsize=30,color='b')#fontsize设置标题字体大小
    for rect in rects:
        height = rect.get_height()
        #fontsize设置直方图上字体大小
        plt.text(rect.get_x() + rect.get_width() / 2, height+0.1, str(height), ha="center", va="bottom",fontsize=15)
    plt.show()
    print ("气温图绘制完毕!")

参考代码(适用于python3):

完整代码

#Crawler类(数据获取):
#!/usr/bin/env python
# coding: utf-8

# In[7]:


import urllib.request,urllib.error
import json
import csv
import chardet
import codecs
import os
import time
 
import importlib,sys
importlib.reload(sys)
 
class Crawler:    
    def get_html(self,url):        
        request = urllib.request.Request(url)
        response = urllib.request.urlopen(request)
        return response.read().decode()
    def parse_json(self,url):
        obj = self.get_html(url)
        if obj:
            json_obj = json.loads(obj)
        else:
            json_obj = list()
        return json_obj
 
    def write_csv(self,file,data):
        if data:
            print ("开始写入 " + file)
            with open(file,'a+',encoding='utf-8-sig') as f:#utf-8-sig  带BOM的utf-8
                f_csv = csv.DictWriter(f,data[0].keys())
                #if not os.path.exists(file):
                f_csv.writeheader()
                f_csv.writerows(data) 
            print ("结束写入 " + file)
 
    def write_header(self,file,data):
        if data:
            print ("开始写入 " + file)
            with open(file,'a+',encoding='utf-8-sig') as f:
                f_csv = csv.DictWriter(f,data[0].keys())
                f_csv.writeheader()
                f_csv.writerows(data) 
            print ("结束写入 " + file)
 
    def write_row(self,file,data):
        if data:
            print ("开始写入 " + file)
            with open(file,'a+',encoding='utf-8-sig') as f:
                f_csv = csv.DictWriter(f,data[0].keys())
                if not os.path.exists(file):
                    f_csv.writeheader()
                f_csv.writerows(data) 
            print ("结束写入 " + file)
 
    def read_csv(self,file):
        print ("开始读取 " + file)
        with open(file,'r+',encoding='utf-8-sig') as f:
            data = csv.DictReader(f)
            print ("结束读取 " + file)
            return list(data)
 
    def get_provinces(self):
        province_file = 'input/province.csv'
        if not os.path.exists(province_file):  
            print ("开始爬取省份")
            provinces = self.parse_json('http://www.nmc.cn/f/rest/province')
            print ("省份爬取完毕!")
            self.write_csv(province_file,provinces)
        else:
            provinces = self.read_csv(province_file)
        return provinces
 
    def get_cities(self):
        city_file = 'input/city.csv'
        if not os.path.exists(city_file):
            cities = list()
            print ("开始爬取城市")
            for province in self.get_provinces():
                url = province['url'].split('/')[-1].split('.')[0]
                cities.extend(self.parse_json('http://www.nmc.cn/f/rest/province/'+url))
            self.write_csv(city_file,cities)
            print ("爬取城市完毕!")
        else:
            cities = self.read_csv(city_file)
        return cities
 
    def get_passed_weather(self,province):
        weather_passed_file = 'input/passed_weather_' + province + '.csv'
        if os.path.exists(weather_passed_file):
            return
        passed_weather = list()
        count = 0
        if province == 'ALL':
            print ("开始爬取过去的天气状况")
            for city in self.get_cities():
                data = self.parse_json('http://www.nmc.cn/f/rest/passed/'+city['code'])
                if data:
                    count = count + 1
                    for item in data:
                        item['city_code'] = city['code']
                        item['province'] = city['province']
                        item['city_name'] = city['city']
                        item['city_index'] = str(count)
                    passed_weather.extend(data)
                if count % 50 == 0:
                    if count == 50:
                        self.write_header(weather_passed_file,passed_weather)
                    else:
                        self.write_row(weather_passed_file,passed_weather)
                    passed_weather = list()
            if passed_weather:
                if count <= 50:
                    self.write_header(weather_passed_file,passed_weather)
                else:
                    self.write_row(weather_passed_file,passed_weather)
            print ("爬取过去的天气状况完毕!")
        else:
            print ("开始爬取过去的天气状况")
            select_city = filter(lambda x:x['province']==province,self.get_cities())
            for city in select_city:
                data = self.parse_json('http://www.nmc.cn/f/rest/passed/'+city['code'])
                if data:
                    count = count + 1
                    for item in data:
                        item['city_index'] = str(count)
                        item['city_code'] = city['code']
                        item['province'] = city['province']
                        item['city_name'] = city['city']
                    passed_weather.extend(data)
            self.write_csv(weather_passed_file,passed_weather)
            print ("爬取过去的天气状况完毕!")
 
    def run(self,range = 'ALL'):
        self.get_passed_weather(range)
 
if __name__ == '__main__':
    cr=Crawler()
    cr.run('ALL')
#SparkSql类(分析+可视化,引入Crawler类之后也可以爬取,前提是passed_weather_ALL.csv不存在;每次运行前需要删除passed_temperature.json和passed_rain_analyse.csv这两个文件夹)

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType,TimestampType
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties
import os
import math
from Crawler import *
import importlib,sys
importlib.reload(sys)

def passed_rain_analyse(filename): #计算各个城市过去24小时累积雨量
    print ("开始分析累积降雨量")
    #spark = SparkSession.builder.master("spark://master:7077").appName("passed_rain_analyse").getOrCreate()
    #spark = SparkSession.builder.master("local[4]").appName("passed_rain_analyse").getOrCreate()
    spark = SparkSession.builder.master("local").appName("passed_rain_analyse").getOrCreate()
    
    df = spark.read.csv(filename,header = True)
    
    df_rain = df.select(df['province'],df['city_name'],df['city_code'],df['rain1h'].cast(DecimalType(scale=1)))        .filter(df['rain1h'] < 1000) #筛选数据,去除无效数据
    df_rain_sum = df_rain.groupBy("province","city_name","city_code")        .agg(F.sum("rain1h").alias("rain24h"))        .sort(F.desc("rain24h")) # 分组、求和、排序
    df_rain_sum.cache()
    df_rain_sum.coalesce(1).write.csv("file:///home/lee/lab5/passed_rain_analyse.csv")
    #spark.catalog.refreshTable(filename)
    print ("累积降雨量分析完毕!")
    return df_rain_sum.head(20)#前20个

def passed_temperature_analyse(filename):
    print ("开始分析气温")
    #spark = SparkSession.builder.master("spark://master:7077").appName("passed_temperature_analyse").getOrCreate()
    spark = SparkSession.builder.master("local").appName("passed_temperature_analyse").getOrCreate()
    #spark = SparkSession.builder.master("local[4]").appName("passed_rain_analyse").getOrCreate()
    df = spark.read.csv(filename,header = True)
    df_temperature = df.select( #选择需要的列
            df['province'],
            df['city_name'],
            df['city_code'],
            df['temperature'].cast(DecimalType(scale=1)),
            F.date_format(df['time'],"yyyy-MM-dd").alias("date"), #得到日期数据
            F.hour(df['time']).alias("hour") #得到小时数据
    )
    # 筛选四点时次
    #df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2,4,6,8]))
    df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2,8,14,20]))
    #df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24]))
    df_avg_temperature = df_4point_temperature.groupBy("province","city_name","city_code","date")        .agg(F.count("temperature"),F.avg("temperature").alias("avg_temperature"))        .filter("count(temperature) = 4")        .sort(F.asc("avg_temperature"))        .select("province","city_name","city_code","date",F.format_number('avg_temperature',1).alias("avg_temperature"))
    df_avg_temperature.cache()
    avg_temperature_list = df_avg_temperature.collect()
    df_avg_temperature.coalesce(1).write.json("file:///home/lee/lab5/passed_temperature.json")
    print ("气温分析完毕")
    return avg_temperature_list[0:10]#最低的10个


def draw_rain(rain_list):
    print ("开始绘制累积降雨量图")
    font = FontProperties(fname='ttf/simhei.ttf') # 设置字体
    name_list = []
    num_list = []
    for item in rain_list:
        name_list.append(item.province[0:2] + '\n' + item.city_name)
        num_list.append(item.rain24h)
    index = [i+0.25 for i in range(0,len(num_list))]
    plt.figure(figsize=(15,12))#设置图的大小
    rects=plt.bar(index, num_list, color='ckrmgby',width = 0.5)
    plt.xticks([i+0.25 for i in index], name_list, fontproperties = font,fontsize=15,color='r')#fontsize设置x刻度字体大小
    plt.ylim(ymax=(int(max(num_list)+100)/100)*20, ymin=0)#设置刻度间隔
    plt.yticks(fontsize=20,color='r')#fontsize设置y刻度字体大小
    plt.xlabel("城市",fontproperties = font,fontsize=25,color='c')#fontsize设置x坐标标签字体大小
    plt.ylabel("雨量",fontproperties = font,fontsize=25,color='c')#fontsize设置y坐标标签字体大小
    plt.title("过去24小时累计降雨量全国前20名",fontproperties = font,fontsize=30,color='b')#fontsize设置标题字体大小
    for rect in rects:
        height = rect.get_height()
        #fontsize设置直方图上字体大小
        plt.text(rect.get_x() + rect.get_width() / 2, height, str(height), ha="center", va="bottom",fontsize=15)
    plt.show()
    print ("累积降雨量图绘制完毕!")

def draw_temperature(temperature_list):
    print ("开始绘制气温图")
    font = FontProperties(fname='/home/lee/lab5/ttf/simhei.ttf')
    name_list = []
    num_list = []
    #print(temperature_list[1])
    date = temperature_list[1].date
    for item in temperature_list:
        name_list.append(item.province[0:2] + '\n' + item.city_name)
        num_list.append(float(item.avg_temperature))
    index = [i+0.25 for i in range(0,len(num_list))]
    plt.figure(figsize=(15,12))#设置图的大小
    rects=plt.bar(index, num_list, color='ckrmgby',width = 0.5)
    plt.xticks([i+0.25 for i in index], name_list, fontproperties = font,fontsize=20,color='r')#fontsize设置x刻度字体大小
    plt.ylim(ymax = math.ceil(float(max(num_list)))*1.5, ymin = 0)#设置刻度间隔
    plt.yticks(fontsize=20,color='r')#fontsize设置y刻度字体大小
    plt.xlabel("城市",fontproperties = font,fontsize=25,color='c')#fontsize设置坐标标签字体大小
    plt.ylabel("日平均气温",fontproperties = font,fontsize=25,color='c')#fontsize设置坐标标签字体大小
    plt.title(date + "全国日平均气温最低前10名",fontproperties = font,fontsize=30,color='b')#fontsize设置标题字体大小
    for rect in rects:
        height = rect.get_height()
        #fontsize设置直方图上字体大小
        plt.text(rect.get_x() + rect.get_width() / 2, height+0.1, str(height), ha="center", va="bottom",fontsize=15)
    plt.show()
    print ("气温图绘制完毕!")

def main():
    sourcefile = "input/passed_weather_ALL.csv"
    if not os.path.exists(sourcefile):
        crawler = Crawler()
        crawler.run('ALL')
    rain_list = passed_rain_analyse('file:///home/lee/lab5/' + sourcefile)
    draw_rain(rain_list)
    temperature_list = passed_temperature_analyse('file:///home/lee/lab5/' + sourcefile)
    draw_temperature(temperature_list)

if __name__ == '__main__':
    main()

运行结果:

数据获取:

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

数据分析:

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

数据可视化大图在下边

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

大图在下边:

分别对应rain.png和temperature.png

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫

spark编程计算各城市的平均气温实训,大数据开源,大数据,信息可视化,spark,数据分析,爬虫文章来源地址https://www.toymoban.com/news/detail-779157.html

到了这里,关于大数据开源框架之基于Spark的气象数据处理与分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析

    案例数据集是在线零售业务的交易数据,采用Python为编程语言,采用Hadoop存储数据,采用Spark对数据进行处理分析,并使用Echarts做数据可视化。由于案例公司商业模式类似新零售,或者说有向此方向发展利好的趋势,所以本次基于利于公司经营与发展的方向进行数据分析。

    2024年02月11日
    浏览(49)
  • NCDC气象数据的提取与处理(四):python批量读取、写入nc数据经纬度格点数值

    1.问题描述: 2.思路: 3.实现过程: 3.1格点位置匹配 3.2写入表格 4.运行效果 4.1打包站点信息 4.2读取nc文件列表 4.3提取对应格点的nc数据 4.4数据写入 NCDC的站点数据处理在之前三节里已经介绍过了,但是NCDC的就那么几种数据可能不能满足日常使用,比如说辐射数据他就没有。

    2024年02月05日
    浏览(77)
  • 基于Spark的气象数据分析

    研究背景与方案 1.1.研究背景 在大数据时代背景下,各行业数据的规模大幅度增加,数据类别日益复杂,给数据分析工作带来极大挑战。 气象行业和人们 的生活息息相关,随着信息时代的发展,大数据技术的出现为气象数据的发展带来机遇。基于此,本项目使用 Spark 等大

    2024年02月09日
    浏览(55)
  • 加速大规模数据处理和多维分析:基于Lucene和Hadoop的开源项目

    大数据时代带来了处理和分析海量数据的挑战,我很高兴向大家介绍我的个人开源项目:Lucene-Hadoop。这个项目基于Lucene和Hadoop,旨在提供高效的数据存储和查询引擎,加速大规模数据处理和多维分析。 项目介绍 https://github.com/arlixu/lucene-hadoop Lucene-Hadoop利用Lucene和Hadoop的强大

    2024年02月08日
    浏览(43)
  • BIC-2022-BDT:区块链和基于数字双胞胎的智能制造高效数据处理安全框架

    摘要 工业物联网具有智能连接、数据实时处理、协同监测、信息自动处理等特点,是物联网时代的重要组成部分之一。异构工业物联网设备对高数据速率、高可靠性、高覆盖、低延迟的要求,已成为信息安全领域的一大挑战。工业物联网中的智能制造产业需要多方协同的信息

    2024年02月06日
    浏览(52)
  • 大数据处理:利用Spark进行大规模数据处理

    大数据处理是指对大规模、高速、多源、多样化的数据进行处理、分析和挖掘的过程。随着互联网、人工智能、物联网等领域的发展,大数据处理技术已经成为当今科技的核心技术之一。Apache Spark是一个开源的大数据处理框架,它可以处理批量数据和流式数据,并提供了一系

    2024年03月22日
    浏览(55)
  • 【spark大数据】spark大数据处理技术入门项目--购物信息分析

    购物信息分析基于spark 目录 本案例中三个文案例中需要处理的文件为 order_goods.txt、products.txt 以及 orders.txt 三个文件,三个文件的说明如下 一、本实训项目针对实验数据主要完成了哪些处理? 二、Hadoop+Spark集群环境的搭建步骤有哪些?(只介绍完全分布式集群环境的搭建)

    2023年04月08日
    浏览(67)
  • spark 数据倾斜处理

    1. 对多次使用的RDD进行持久化 同常内存够的时候建议使用:MEMORY_ONLY 如果内存不够的时候使用 通常建议使用:MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。 2. 使用高性能的算子 3. 广播大变量 4. 使用Kryo优化序列化性能 Kryo序列化器介绍: Spark支持使用Kryo序列化机制。Kryo序列化

    2024年02月11日
    浏览(48)
  • Spark大数据处理讲课笔记4.1 Spark SQL概述、数据帧与数据集

      目录 零、本讲学习目标 一、Spark SQL (一)Spark SQL概述 (二)Spark SQL功能 (三)Spark SQL结构 1、Spark SQL架构图 2、Spark SQL三大过程 3、Spark SQL内部五大组件 (四)Spark SQL工作流程 (五)Spark SQL主要特点 1、将SQL查询与Spark应用程序无缝组合 2、Spark SQL以相同方式连接多种数据

    2024年02月09日
    浏览(63)
  • Spark Streaming实时数据处理

    作者:禅与计算机程序设计艺术 Apache Spark™Streaming是一个构建在Apache Spark™之上的快速、微批次、容错的流式数据处理系统,它可以对实时数据进行高吞吐量、低延迟地处理。Spark Streaming既可用于流计算场景也可用于离线批处理场景,而且可以将结构化或无结构化数据源(如

    2024年02月06日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包