sparkSql数据离线处理--整理记录

这篇具有很好参考价值的文章主要介绍了sparkSql数据离线处理--整理记录。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

sparkSql数据离线处理

前言:本文作为本人学习sparkSql离线数据抽取,离线数据处理的学习整理记录,文中参考博客均附上原文链接。

一、Hive环境准备

1、配置文件准备:

/opt/hive/conf/hive-site.xml:(2021/12/31修改,添加了&useSSL=false&useUnicode=true&characterEncoding=utf8支持中文编码)

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&amp;useSSL=false&amp;useUnicode=true&amp;characterEncoding=utf8</value>
    <description>hive的元数据库 </description>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
    <description>mysql的驱动jar包 </description>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
    <description>设定数据库的用户名 </description>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>xxx</value>
    <description>设定数据库的密码</description>
   </property>
<!--zbt添加-->
   <property>
      <name>hive.exec.max.dynamic.partitions</name>
      <value>100000</value>
      <description>在所有执行MR的节点上,最大一共可以创建多少个动态分区</description>
   </property>
   <property>
      <name>hive.exec.max.dynamic.partitions.pernode</name>
      <value>100000</value>
      <description>在所有执行MR的节点上,最大可以创建多少个动态分区</description>
  </property>
</configuration>

若要在idea环境下运行要把

hdfs-site.xml

core-site.xml

sparkSql数据离线处理--整理记录

hive-site.xml

sparkSql数据离线处理--整理记录

放到resources文件夹中

sparkSql数据离线处理--整理记录

否则hive.exec.max.dynamic.partitions.pernode,hive.exec.max.dynamic.partitions

配置不生效

2、hosts设置

若在不同网络环境下

需设置本地hosts

sparkSql数据离线处理--整理记录

设置的内容为集群主机名

Ubuntu的hosts文件在 /etc

参考资料:(10条消息) java.lang.IllegalArgumentException: java.net.UnknownHostException: xxx_小健的博客-CSDN博客

3、远程连接服务开启

hive --service metastore

参考资料:(13条消息) hive的几种启动方式_lbl的博客-CSDN博客_hive启动

4、其他

mysql服务启动

service mysqld start

防火墙关闭

systemctl stop firewalld

二、IDEA环境准备

1、pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>sparkDome1</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>HiveAndMysql</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <hadoop.version>2.7.7</hadoop.version>
        <spark.version>2.1.1</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <!--hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!--scala依赖-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>
        <!--spark依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--hive依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <!--includes是一个数组,包含要编译的code-->
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

2、Hadoop环境

window下运行需要准备Hadoop环境

在代码编写中指定hadoop.home.dir

System.setProperty("hadoop.home.dir","........")

sparkSql数据离线处理--整理记录

3、其他

Scala插件依赖需先下载好

注意环境与集群对应,本文档的环境为Scala-11

三、代码编写

1、全量抽取

import org.apache.spark.sql.SparkSession

/**
   * mysql->hive 全量抽取
   */
   object ShopTest {

  def main(args: Array[String]): Unit = {
	//设置用户名,防止因为权限不足无法创建文件
    System.setProperty("HADOOP_USER_NAME", "root")
    //获取实例对象
    val spark = SparkSession.builder()
      .appName("ShopTest")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
      .enableHiveSupport()
      .getOrCreate()
    //jdbc连接配置
    val mysqlMap = Map(
      "url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false",
      "user" -> "root",
      "password" -> "xxx",
      "driver" -> "com.mysql.jdbc.Driver"
    )
    //使用jdbc抽取mysql表数据
    val inputTable = spark.read.format("jdbc")
      .options(mysqlMap)
      .option("dbtable", "EcData_tb_1")
      .load()
    
    //    inputTable.show()
      
    //将mysql表数据创建为临时表
    inputTable.createOrReplaceTempView("inputTable")
    //hive动态分区开启
    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
    //hive分区模式设置,默认为strict严格模式,若设置分区必须要有一个静态分区
    //需要设置为nonstrict模式,可以都是动态分区
    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    //hive分区数设置,目前版本已无法在程序中设置,参考上文Hive环境准备-配置文件准备
    spark.sqlContext.sql("set hive.exec.max.dynamic.partitions.pernode = 10000")
    spark.sqlContext.sql("set hive.exec.max.dynamic.partitions = 10000")

    // mysql表结构,通过desc table tb_name;命令可获取

    /*    +-------------+---------+------+-----+---------+-------+
        | Field       | Type    | Null | Key | Default | Extra |
        +-------------+---------+------+-----+---------+-------+
        | InvoiceNo   | text    | YES  |     | NULL    |       |
        | StockCode   | text    | YES  |     | NULL    |       |
        | Description | text    | YES  |     | NULL    |       |
        | Quantity    | int(11) | YES  |     | NULL    |       |
        | InvoiceDate | text    | YES  |     | NULL    |       |
        | UnitPrice   | double  | YES  |     | NULL    |       |
        | CustomerID  | int(11) | YES  |     | NULL    |       |
        | Country     | text    | YES  |     | NULL    |       |
        +-------------+---------+------+-----+---------+-------+*/
    
    //于hive数据库,ods层中创建表
    spark.sqlContext.sql(
      """
        |create table if not exists clown_test_db.ShopTest_ods_tb_1
        |(
        | InvoiceNo string ,
        | StockCode string ,
        | Description string ,
        | Quantity int ,
        | InvoiceDate string ,
        | UnitPrice double ,
        | CustomerID int ,
        | Country string
        |)
        |partitioned by (country_pid string,customer_pid int)
        |row format delimited
        |fields terminated by '\t'  //本数据中字段值存在','不能用','作为分隔符
        |lines terminated by '\n'
        |stored as textfile
        |""".stripMargin)
      
    //使用sql-insert into 语句将mysql数据全部导入hive表中
    spark.sqlContext.sql(
      """
        |insert into table clown_test_db.ShopTest_ods_tb_1 partition(country_pid,customer_pid)
        |select *,Country,CustomerID from inputTable
        |""".stripMargin)

  }

}

2、增量抽取

import java.text.SimpleDateFormat

import org.apache.spark.sql.{SaveMode, SparkSession}

/**

 * hive_ods -> hive_dwd 增量抽取
   */
   object ShopTest2 {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","root")

    val spark = SparkSession.builder()
      .appName("ShopTest2")
      .master("local[*]")
      .config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
      .config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")
      .enableHiveSupport()
      .getOrCreate()
    
    /*    +-------------+---------+------+-----+---------+-------+
    | Field       | Type    | Null | Key | Default | Extra |
    +-------------+---------+------+-----+---------+-------+
    | InvoiceNo   | text    | YES  |     | NULL    |       |
    | StockCode   | text    | YES  |     | NULL    |       |
    | Description | text    | YES  |     | NULL    |       |
    | Quantity    | int(11) | YES  |     | NULL    |       |
    | InvoiceDate | text    | YES  |     | NULL    |       |
    | UnitPrice   | double  | YES  |     | NULL    |       |
    | CustomerID  | int(11) | YES  |     | NULL    |       |
    | Country     | text    | YES  |     | NULL    |       |
    +-------------+---------+------+-----+---------+-------+*/

	//隐式转换,sql方法导入
    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    
    //直接通过sql语句获取到hive ods层中的表数据
    val inputData = spark.sqlContext.sql("select * from clown_test_db.ShopTest_ods_tb_1")
    
    //设置时间条件
    val timeStr = "2011/01/01 00:00"
    val timeTemp = new SimpleDateFormat("yyyy/MM/dd HH:mm").parse(timeStr).getTime//单位为ms
    println(timeTemp)
    
    //未转换前的数据格式为:12/8/2010 9:53
    val timeFormat = inputData
      .withColumn("InvoiceDate",unix_timestamp($"InvoiceDate","MM/dd/yyyy HH:mm"))//时间戳获取,单位为s
      .where(s"InvoiceDate>$timeTemp/1000")//增量条件判断
      .withColumn("InvoiceDate",from_unixtime($"InvoiceDate","yyyy/MM/dd HH:mm"))//时间格式转换
      .where("Country='United Kingdom' or Country = 'Finland'")//筛选出国家名为United Kingdom 或 Finland的数据
    
    //由于该ods层表与目标dwd层表结构相同,直接用like语句创建结构相同的dwd表
    spark.sqlContext.sql(
      """
        |create table if not exists clown_dwd_db.shoptest_dwd_tb_1
        |like clown_test_db.ShopTest_ods_tb_1
        |""".stripMargin)


    //使用sparkSql算子将数据由ods表数据增量抽取到dwd表中
    timeFormat.write.format("hive")
      .mode(SaveMode.Append)
      .insertInto("clown_dwd_db.shoptest_dwd_tb_1")
  }

}

3、数据清洗

import org.apache.spark.sql.{SaveMode, SparkSession}

/**
 * hive_dwd->hive_dwd 缺失值剔除与填充
 */
object ShopTest3 {
 /*+-------------+---------+------+-----+---------+-------+
   | Field       | Type    | Null | Key | Default | Extra |
   +-------------+---------+------+-----+---------+-------+
   | InvoiceNo   | text    | YES  |     | NULL    |       |
   | StockCode   | text    | YES  |     | NULL    |       |
   | Description | text    | YES  |     | NULL    |       |
   | Quantity    | int(11) | YES  |     | NULL    |       |
   | InvoiceDate | text    | YES  |     | NULL    |       |
   | UnitPrice   | double  | YES  |     | NULL    |       |
   | CustomerID  | int(11) | YES  |     | NULL    |       |
   | Country     | text    | YES  |     | NULL    |       |
   +-------------+---------+------+-----+---------+-------+*/
def main(args: Array[String]): Unit = {
   System.setProperty("HADOOP_USER_NAME","root")

   val spark = SparkSession.builder()
     .appName("ShopTest3")
     .master("local[*]")
     .config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
     .config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")
     .enableHiveSupport()
     .getOrCreate()

   import spark.implicits._
   import org.apache.spark.sql.functions._

   spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
   spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")

   val data = spark.sqlContext.sql("select * from clown_dwd_db.shoptest_dwd_tb_1")

   spark.sqlContext.sql(
     """
       |create table if not exists clown_dwd_db.shopTest_dwd_tb_3
       |(
       | InvoiceNo string ,
       | StockCode string ,
       | Description string ,
       | Quantity int ,
       | InvoiceDate string ,
       | UnitPrice double ,
       | CustomerID int ,
       | Country string
       |)
       |partitioned by (country_pid string)
       |row format delimited
       |fields terminated by '\t'
       |lines terminated by '\n'
       |stored as textfile
       |""".stripMargin)

  //使用na.fill对缺失值进行填充
  //使用na.drop对缺失值进行剔除
   data.na.fill(
     Map(
       "Country"->"Country_Null",
       "CustomerID"->0
     )
   )
     .na.drop(
     Seq("UnitPrice","Quantity")
   )      .selectExpr("InvoiceNo","StockCode","Description","Quantity","InvoiceDate","UnitPrice","CustomerID","Country","Country")//由于数据中存在分区表字段,且该字段关联数据已改变,需要重新进行赋值
     .limit(10000)
     .write
     .format("hive")
     .mode(SaveMode.Append)
     .insertInto("clown_dwd_db.shopTest_dwd_tb_3")

  }
}

4、指标计算

import org.apache.spark.sql.SparkSession

/**
  * sparkSql算子实现指标计算
  */
object ShopTest4 {
  /*    +-------------+---------+------+-----+---------+-------+
| Field       | Type    | Null | Key | Default | Extra |
+-------------+---------+------+-----+---------+-------+
| InvoiceNo   | text    | YES  |     | NULL    |       |
| StockCode   | text    | YES  |     | NULL    |       |
| Description | text    | YES  |     | NULL    |       |
| Quantity    | int(11) | YES  |     | NULL    |       |
| InvoiceDate | text    | YES  |     | NULL    |       |
| UnitPrice   | double  | YES  |     | NULL    |       |
| CustomerID  | int(11) | YES  |     | NULL    |       |
| Country     | text    | YES  |     | NULL    |       |
+-------------+---------+------+-----+---------+-------+*/
  def main(args: Array[String]): Unit = {

    System.setProperty("HADOOP_USER_NAME", "root")
    
    val spark = SparkSession.builder()
      .appName("ShopTest4")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
      .enableHiveSupport()
      .getOrCreate()
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    
    val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")
    
    /**
     * 统计每个国家的客户数,输出结果。
     * 排序后输出客户最多的10个国家
     */
    
        data.dropDuplicates("CustomerID","Country")//去重
          .withColumn("x",lit(1))//添加一列数据都为1
          .groupBy("Country")//聚合国家字段
          .sum("x")//对1数据字段进行累加
          .show(20)
    
    /**
     * 统计各个国家的总销售额分布情况
     */
    
    data.withColumn("x", $"Quantity" * $"UnitPrice")//添加销售额字段,值为数量*单价
      .groupBy("Country")//聚合国家字段
      .sum("x")//计算总销售额
      .withColumn("sum(x)", round($"sum(x)", 2))//对结果字段进行四舍五入到两位,但round会对最后一位0省略,最好使用其他函数
    /*若题目要求输出格式可进行rdd转换
      .rdd
      .map(x=>x.mkString(","))
      .foreach(println(_))
    */


    /**
     * 统计每种商品的销量,输出结果
     * 排序后输出销量最高的10种商品
     */
    data.groupBy("StockCode")//聚合商品编码字段
      .sum("Quantity")//计算销量
      .coalesce(1)//将spark分区设置为1,防止后面排序混乱
      .orderBy(desc("sum(Quantity)"))//由大到小排序
      .show(10)
    
    /**
     * 统计月销售额随时间的变化趋势
     * [月份,销售额]
     */
    data.withColumn("InvoiceDate",substring_index($"InvoiceDate","/",2))//由于数据在增量抽取阶段已进行时间格式转换,可直接进行切割得出 年份/月份 的格式,substring_index与split不同
      .withColumn("x",$"Quantity"*$"UnitPrice")//计算销售额
      .groupBy("InvoiceDate")//对月份进行聚合
      .sum("x")//计算总销售额
      .coalesce(1)//设置spark分区为1
      .orderBy(desc("InvoiceDate"))//由大到小排序
      .withColumn("sum(x)",round($"sum(x)",2))//四舍五入到2位
      .show(100)
    
    /**
     * 统计商品描述中,排名前300(Top300)的热门关键词
     */
    data.select(col("Description"))//商品将描述字段单独查询
      .flatMap(x=>x.toString().split("\\W"))//进行flatMap 切割后展平,切割\\W为正则匹配模式,匹配所有符号
      .withColumn("x",lit(1))//增加1的数据列
      .groupBy("value")//展平后字段名为value,进行聚合
      .sum("x")//累加1数据
      .where("value != '' ")//筛除空白数据
      .coalesce(1)//设置spark分区为1
      .orderBy(desc("sum(x)"))//由大到小排序
      .show(300)//展示300条

  }

}
import org.apache.spark.sql.SparkSession

/**
  *  sql语句实现指标计算
  */

object ShopTest5 {
  /*    +-------------+---------+------+-----+---------+-------+
| Field       | Type    | Null | Key | Default | Extra |
+-------------+---------+------+-----+---------+-------+
| InvoiceNo   | text    | YES  |     | NULL    |       |
| StockCode   | text    | YES  |     | NULL    |       |
| Description | text    | YES  |     | NULL    |       |
| Quantity    | int(11) | YES  |     | NULL    |       |
| InvoiceDate | text    | YES  |     | NULL    |       |
| UnitPrice   | double  | YES  |     | NULL    |       |
| CustomerID  | int(11) | YES  |     | NULL    |       |
| Country     | text    | YES  |     | NULL    |       |
+-------------+---------+------+-----+---------+-------+*/
  def main(args: Array[String]): Unit = {

    System.setProperty("HADOOP_USER_NAME", "root")
    
    val spark = SparkSession.builder()
      .appName("ShopTest5")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
      .enableHiveSupport()
      .getOrCreate()
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    
    val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")
    
    data.createOrReplaceTempView("dataTable")
    
    /**
     * 统计每个国家的客户数,输出结果。
     * 排序后输出客户最多的10个国家
     */

   //对去重后的Country,CustomerID进行聚合统计即可得出各个国家的客户数
   spark.sqlContext.sql(
     """
       |select Country,count(distinct Country,CustomerID) from dataTable group by Country
       |""".stripMargin)
     .show()

    /**
     * 统计各个国家的总销售额分布情况
     */
    
    spark.sqlContext.sql(
      """
        |select Country ,round(sum(Quantity*UnitPrice),2)
        |from dataTable
        |group by Country
        |""".stripMargin)
      .show()
    
    /**
     * 统计每种商品的销量,输出结果
     * 排序后输出销量最高的10种商品
     */
    
    spark.sqlContext.sql(
      """
        |select StockCode,round(sum(Quantity*UnitPrice),2) as xl
        |from dataTable
        |group by StockCode
        |order by xl desc
        |""".stripMargin)
      .show(10)
    
    /**
     * 统计月销售额随时间的变化趋势
     * [月份,销售额]
     */
    
    //group by执行优先度可能高于 as 重命名,因此as后的名字无法用于group by 聚合
    spark.sqlContext.sql(
      """
        |select substring_index(InvoiceDate,"/",2) as time,round(sum(Quantity*UnitPrice),2) as sum
        |from dataTable
        |group by substring_index(InvoiceDate,"/",2)
        |order by substring_index(InvoiceDate,"/",2)
        |""".stripMargin)
      .show()
    
    /**
     * 统计商品描述中,排名前300(Top300)的热门关键词
     */
    
    //目前认为该题用sql解法没有必要
      //- - 
  }

}

四、其他

1、hive分区的增删改查

参考资料:(15条消息) HIve学习:Hive分区修改_u011047968的专栏-CSDN博客_hive修改分区

hive表新增分区:[]内的不必要

alter table tb_name add partition (pid1 = ‘’,pid2 = ) [location ‘xxx’] 

多个分区

alter table tb_name add partition (pid1 = ‘’,pid2 = ) partition (pid1 = ‘’,pid2 = ) [location ‘xxx’] 

hive表修改分区:

alter table tb_name partition(pid1='') rename to partition(pid1='');/*修改分区名*/
alter table tb_name partition(pid1='') set location 'hdfs://master:8020/....';/*修改分区路径,注意使用绝对路径*/  
alter table tb_name partition column (pid1 string);/*修改分区字段数据类型*/

hive表删除分区:

alter table tb_name drop partition (pid1 = ‘’,pid2 = )[ partition (pid1 = ‘’,pid2 = )] 

hive分区值查询:

show partitions tb_name;

2、spark打包运行

命令:

spark-submit --class ShopTest4 --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar

若使用了jdbc连接,需要指明驱动jar包 mysql-connector-java-5.1.48.jar

spark-submit --jars mysql-connector-java-5.1.48.jar --class ShopTest --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar

或者将mysql驱动放至 $‘spark_home’/jars 目录下

sparkSql数据离线处理--整理记录

3、时间格式

时间模式字符串用来指定时间格式。在此模式中,所有的 ASCII 字母被保留为模式字母,定义如下:

字母 描述 示例
G 纪元标记 AD
y 四位年份 2001
M 月份 July or 07
d 一个月的日期 10
h A.M./P.M. (1~12)格式小时 12
H 一天中的小时 (0~23) 22
m 分钟数 30
s 秒数 55
S 毫秒数 234
E 星期几 Tuesday
D 一年中的日子 360
F 一个月中第几周的周几 2 (second Wed. in July)
w 一年中第几周 40
W 一个月中第几周 1
a A.M./P.M. 标记 PM
k 一天中的小时(1~24) 24
K A.M./P.M. (0~11)格式小时 10
z 时区 Eastern Standard Time
文字定界符 Delimiter
" 单引号 `

4、Scala正则表达式

Scala 的正则表达式继承了 Java 的语法规则,Java 则大部分使用了 Perl 语言的规则。

下表我们给出了常用的一些正则表达式规则:(注意:\需要转义,算子中写为\,sql语句中写为\\\)

表达式 匹配规则
^ 匹配输入字符串开始的位置。
$ 匹配输入字符串结尾的位置。
. 匹配除"\r\n"之外的任何单个字符。
[…] 字符集。匹配包含的任一字符。例如,"[abc]“匹配"plain"中的"a”。
[^…] 反向字符集。匹配未包含的任何字符。例如,"[^abc]“匹配"plain"中"p”,“l”,“i”,“n”。
\A 匹配输入字符串开始的位置(无多行支持)
\z 字符串结尾(类似$,但不受处理多行选项的影响)
\Z 字符串结尾或行尾(不受处理多行选项的影响)
re* 重复零次或更多次
re+ 重复一次或更多次
re? 重复零次或一次
re{ n} 重复n次
re{ n,}
re{ n, m} 重复n到m次
a|b 匹配 a 或者 b
(re) 匹配 re,并捕获文本到自动命名的组里
(?: re) 匹配 re,不捕获匹配的文本,也不给此分组分配组号
(?> re) 贪婪子表达式
\w 匹配字母或数字或下划线或汉字
\W 匹配任意不是字母,数字,下划线,汉字的字符
\s 匹配任意的空白符,相等于 [\t\n\r\f]
\S 匹配任意不是空白符的字符
\d 匹配数字,类似 [0-9]
\D 匹配任意非数字的字符
\G 当前搜索的开头
\n 换行符
\b 通常是单词分界位置,但如果在字符类里使用代表退格
\B 匹配不是单词开头或结束的位置
\t 制表符
\Q 开始引号:\Q(a+b)*3\E 可匹配文本 “(a+b)*3”。
\E 结束引号:\Q(a+b)*3\E 可匹配文本 “(a+b)*3”。

正则表达式实例

实例 描述
. 匹配除"\r\n"之外的任何单个字符。
[Rr]uby 匹配 “Ruby” 或 “ruby”
rub[ye] 匹配 “ruby” 或 “rube”
[aeiou] 匹配小写字母 :aeiou
[0-9] 匹配任何数字,类似 [0123456789]
[a-z] 匹配任何 ASCII 小写字母
[A-Z] 匹配任何 ASCII 大写字母
[a-zA-Z0-9] 匹配数字,大小写字母
[^aeiou] 匹配除了 aeiou 其他字符
[^0-9] 匹配除了数字的其他字符
\d 匹配数字,类似: [0-9]
\D 匹配非数字,类似: [^0-9]
\s 匹配空格,类似: [ \t\r\n\f]
\S 匹配非空格,类似: [^ \t\r\n\f]
\w 匹配字母,数字,下划线,类似: [A-Za-z0-9_]
\W 匹配非字母,数字,下划线,类似: [^A-Za-z0-9_]
ruby? 匹配 “rub” 或 “ruby”: y 是可选的
ruby* 匹配 “rub” 加上 0 个或多个的 y。
ruby+ 匹配 “rub” 加上 1 个或多个的 y。
\d{3} 刚好匹配 3 个数字。
\d{3,} 匹配 3 个或多个数字。
\d{3,5} 匹配 3 个、4 个或 5 个数字。
\D\d+ 无分组: + 重复 \d
(\D\d)+/ 分组: + 重复 \D\d 对
([Rr]uby(, )?)+ 匹配 “Ruby”、“Ruby, ruby, ruby”,等等

常用可以应用正则的函数:

.split(“”)切割字符串

.regexp_extract(string subject, string pattern, int index) 将字符串subject按照pattern正则表达式的规则拆分,返回index指定的字符

.regexp_replace(string A, string B, string C) 将字符串A中的符合Java正则表达式B的部分替换为C

.equals(“”)匹配

5、SQL like与rlike

like为通配符匹配,不是正则

%:匹配零个及多个任意字符

_:与任意单字符匹配

[]:匹配一个范围

[^]:排除一个范围

rlike为正则匹配

regexp与rlike功能相似

参考资料:(15条消息) sparksql 正则匹配总结_Andrew LD-CSDN博客_spark 正则表达式

6、中文数据

关于csv文件若包含中文,可在读取时设置option参数

/**
 * 注意option的设置
 * 读取本地文件需要加上file:///否则默认读hdfs文件
 */
val inputData = spark.sqlContext.read.format("csv")
  .option("sep","\t")
  .option("encoding","GBK")
  .option("header","true")
 
.load("file:///C:\\Users\\61907\\Desktop\\BigData\\Spark\\sparkDome1\\HiveAndMysql\\src\\main\\resources\\cov19.csv")

jdbc读取数据库数据时,若有中文需设置jdbc连接参数

&useUnicode=true&characterEncoding=utf8

//    jdbc中文编码设置
    val mysqlMap = Map(
      "url"->"jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",
      "user"->"root",
      "password"->"xxx",
      "driver"->"com.mysql.jdbc.Driver"
    )

关于hive中存储中文数据,中文注释,中文分区(索引)

Ⅰ~Ⅲ参考资料:

(16条消息) hive设置中文编码格式utf-8_2020xyz的博客-CSDN博客_hive建表指定编码格式

(16条消息) hive修改使用utf8编码支持中文字符集_那又怎样?的博客-CSDN博客_hive默认字符集编码

Ⅰ.元数据库设置

元数据库需设置为utf-8编码

##创建hive元数据库hive,并指定utf-8编码格式
mysql>create database hive DEFAULT CHARSET utf8 COLLATE utf8_general_ci;


##修改已存在的hive元数据库,字符编码格式为utf-8
mysql>alter database hive character set utf8;     


##进入hive元数据库
mysql>use hive;

##查看元数据库字符编码格式
mysql>show variables like 'character_set_database';  

sparkSql数据离线处理--整理记录

Ⅱ.相关表设置

1).修改字段注释字符集

mysql>alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;

sparkSql数据离线处理--整理记录

2).修改表注释字符集

mysql>alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;

sparkSql数据离线处理--整理记录

类似的,PARAM_KEY若需要中文也可设置为utf8

sparkSql数据离线处理--整理记录

3).修改分区表参数,以支持分区能够用中文表示

mysql>alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;


mysql>alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;

sparkSql数据离线处理--整理记录

sparkSql数据离线处理--整理记录

另外,PARTITIONS表中存放分区名的字段也需要修改为utf8

mysql>alter table PARTITIONS modify column PART_name varchar(4000) character set utf8;

sparkSql数据离线处理--整理记录

4).修改索引注解

mysql>alter table INDEX_PARAMS modify column PARAM_VALUE varchar(250) character set utf8;

sparkSql数据离线处理--整理记录

Ⅲ.hive-site.xml配置文件设置

需要在jdbc连接中设置支持中文编码

&amp;useSSL=false&amp;useUnicode=true&amp;characterEncoding=utf8

其中&需要使用&amp;转义

参考资料:(16条消息) 【已解决】The reference to entity “useSSL” must end with the ‘;’ delimiter_清宵尚温的博客-CSDN博客

/opt/hive/conf/hive-site.xml:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&amp;useSSL=false&amp;useUnicode=true&amp;characterEncoding=utf8</value>
    <description>hive的元数据库 </description>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
    <description>mysql的驱动jar包 </description>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
    <description>设定数据库的用户名 </description>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>xxx</value>
    <description>设定数据库的密码</description>
   </property>
<!--zbt添加-->
   <property>
      <name>hive.exec.max.dynamic.partitions</name>
      <value>100000</value>
      <description>在所有执行MR的节点上,最大一共可以创建多少个动态分区</description>
   </property>
   <property>
      <name>hive.exec.max.dynamic.partitions.pernode</name>
      <value>100000</value>
      <description>在所有执行MR的节点上,最大可以创建多少个动态分区</description>
  </property>
</configuration>
Ⅳ.未解决问题

hdfs文件系统中显示

sparkSql数据离线处理--整理记录

虽然正常显示中文但在文件夹中会出现

Path does not exist on HDFS or WebHDFS is disabled. Please check your path or enable WebHDFS

sparkSql数据离线处理--整理记录

可能是中文路径导致的错误,但该错误目前未影响到分区表的正常操作,具体影响仍需实验。

Ⅴ.暴力脚本- -

参考资料:(16条消息) hive分区字段含中文导致的报错_一定要努力努力再努力的博客-CSDN博客_hive分区字段是中文

alter database hive_meta default character set utf8;
alter table BUCKETING_COLS default character set utf8;
alter table CDS default character set utf8;
alter table COLUMNS_V2 default character set utf8;
alter table DATABASE_PARAMS default character set utf8;
alter table DBS default character set utf8;
alter table FUNCS default character set utf8;
alter table FUNC_RU default character set utf8;
alter table GLOBAL_PRIVS default character set utf8;
alter table PARTITIONS default character set utf8;
alter table PARTITION_KEYS default character set utf8;
alter table PARTITION_KEY_VALS default character set utf8;
alter table PARTITION_PARAMS default character set utf8;
-- alter table PART_COL_STATS default character set utf8;
alter table ROLES default character set utf8;
alter table SDS default character set utf8;
alter table SD_PARAMS default character set utf8;
alter table SEQUENCE_TABLE default character set utf8;
alter table SERDES default character set utf8;
alter table SERDE_PARAMS default character set utf8;
alter table SKEWED_COL_NAMES default character set utf8;
alter table SKEWED_COL_VALUE_LOC_MAP default character set utf8;
alter table SKEWED_STRING_LIST default character set utf8;
alter table SKEWED_STRING_LIST_VALUES default character set utf8;
alter table SKEWED_VALUES default character set utf8;
alter table SORT_COLS default character set utf8;
alter table TABLE_PARAMS default character set utf8;
alter table TAB_COL_STATS default character set utf8;
alter table TBLS default character set utf8;
alter table VERSION default character set utf8;
alter table BUCKETING_COLS convert to character set utf8;
alter table CDS convert to character set utf8;
alter table COLUMNS_V2 convert to character set utf8;
alter table DATABASE_PARAMS convert to character set utf8;
alter table DBS convert to character set utf8;
alter table FUNCS convert to character set utf8;
alter table FUNC_RU convert to character set utf8;
alter table GLOBAL_PRIVS convert to character set utf8;
alter table PARTITIONS convert to character set utf8;
alter table PARTITION_KEYS convert to character set utf8;
alter table PARTITION_KEY_VALS convert to character set utf8;
alter table PARTITION_PARAMS convert to character set utf8;
-- alter table PART_COL_STATS convert to character set utf8;
alter table ROLES convert to character set utf8;
alter table SDS convert to character set utf8;
alter table SD_PARAMS convert to character set utf8;
alter table SEQUENCE_TABLE convert to character set utf8;
alter table SERDES convert to character set utf8;
alter table SERDE_PARAMS convert to character set utf8;
alter table SKEWED_COL_NAMES convert to character set utf8;
alter table SKEWED_COL_VALUE_LOC_MAP convert to character set utf8;
alter table SKEWED_STRING_LIST convert to character set utf8;
alter table SKEWED_STRING_LIST_VALUES convert to character set utf8;
alter table SKEWED_VALUES convert to character set utf8;
alter table SORT_COLS convert to character set utf8;
alter table TABLE_PARAMS convert to character set utf8;
alter table TAB_COL_STATS convert to character set utf8;
alter table TBLS convert to character set utf8;
alter table VERSION convert to character set utf8;
-- alter table PART_COL_STATS convert to character set utf8;
SET character_set_client = utf8 ;
-- SET character_set_connection = utf8 ;

-- alter table PART_COL_STATS convert to character set utf8;
SET character_set_database = utf8 ;
SET character_set_results = utf8 ;
SET character_set_server = utf8 ;
-- SET collation_connection = utf8 ;
-- SET collation_database = utf8 ;
-- SET collation_server = utf8 ;
SET NAMES 'utf8';

只复制了博客中修改表字段的部分

看看就好,最好还是根据需求修改。

Ⅵ.实例
import org.apache.spark.sql.{SaveMode, SparkSession}

object CNHivePartitionTest {

  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir", "D:\\BaiduNetdiskDownload\\hadoop-2.7.3")
    System.setProperty("HADOOP_USER_NAME", "root")

    val spark = SparkSession.builder()
      .appName("Cov19DataDome4")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")

    val mysqlMap = Map(
      "url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",
      "user" -> "root",
      "password" -> "xxx",
      "driver" -> "com.mysql.jdbc.Driver"
    )

    val mysqlData = spark.read.format("jdbc")
      .options(mysqlMap)
      .option("dbtable","tc_hotel2")
      .load()

    spark.sqlContext.sql(
      """
        |create table if not exists clown_test_db.CNTest
        |(
        |  `hname` string,
        |  `hbrand` string,
        |  `province` string,
        |  `city` string,
        |  `starlevel` string,
        |  `rating` string,
        |  `comment_count` string,
        |  `price` string
        |)
        |partitioned by (pid string)
        |row format delimited
        |fields terminated by '\t'
        |lines terminated by '\n'
        |stored as textfile
        |""".stripMargin)

    mysqlData
      .select(col("*"),col("province"))
      .write
      .format("hive")
      .mode(SaveMode.Append)
      .insertInto("clown_test_db.CNTest")
  }

}

7、表连接join/union

参考资料:https://blog.csdn.net/m0_37809146/article/details/91282446

sparkSql数据离线处理--整理记录

val tb1 = spark.read.format("jdbc")
  .options(mysqlMap)
  .option("dbtable", "cov19_test_tb")
  .load()

val tb2 = spark.read.format("jdbc")
  .options(mysqlMap)
  .option("dbtable", "cov19_test_tb_2")
  .load()
  .withColumnRenamed("", "")

/**
 * inner 交集,只会联合给出字段都存在的数据
 */
tb1.join(tb2, Seq("provinceName", "cityName"), "inner")
//      .show(100)
/**
 * right 右链接,展示右边表所有数据
 */
tb1.join(tb2, Seq("provinceName", "cityName"), "right")
//      .show(100)
/**
 * left 左链接,展示左边表所有数据
 */
tb1.join(tb2, Seq("provinceName", "cityName"), "left")
//      .show(100)

val testTb1 = tb1.withColumnRenamed("cityName", "tb1CN")
val testTb2 = tb2.withColumnRenamed("cityName", "tb1CN")

//默认 inner连接,进行连接的条件字段必须两边表都存在
testTb1.join(testTb2, "tb1CN")
//      .show()


/**
 * right_outer 右外连接,相当于左连接
 */
tb1.join(tb2, Seq("provinceName", "cityName"), "right_outer")
//      .show(100)

/**
 * left_outer 左外连接,相当于右连接
 */
tb1.join(tb2, Seq("provinceName", "cityName"), "left_outer")
//      .show(100)

/**
 * 外连接 类似把左右连接出的集合加起来- -
 */
tb1.join(tb2, Seq("provinceName", "cityName"), "outer")
//      .show(100)

/**
 * 全连接
 */
tb1.join(tb2, Seq("provinceName", "cityName"), "full")
//      .show(100)

/**
 * 全外连接
 */
tb1.join(tb2, Seq("provinceName", "cityName"), "full_outer")
//      .show(100)

/**
 * 交集
 */
tb1.join(tb2, Seq("provinceName", "cityName"), "left_semi")
  .show(100)

/**
 * 差集
 */
tb1.join(tb2, Seq("provinceName", "cityName"), "left_anti")
  .show(100)


/**
 * https://blog.csdn.net/wcc27857285/article/details/86439313
 * 其他知识点:
 * HAVING 子句
 * 在 SQL 中增加 HAVING 子句原因是,WHERE 关键字无法与聚合函数一起使用。
 *
 * SQL HAVING 语法
 * SELECT column_name, aggregate_function(column_name)
 * FROM table_name
 * WHERE column_name operator value
 * GROUP BY column_name
 * HAVING aggregate_function(column_name) operator value
 *
 *
 * --- JOIN ON
 * JOIN写连接字段
 * ON写匹配条件
 *
 */

8、自定义UDF,UDAF函数

Spark 2.4.0编程指南–Spark SQL UDF和UDAF-阿里云开发者社区 (aliyun.com)

(17条消息) Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator_L-CSDN博客

(17条消息) UDF和UDAF,UDTF的区别_山海-CSDN博客_udf和udtf区别

[(17条消息) Spark] 自定义函数 udf & pandas_udf_風の唄を聴け的博客-CSDN博客_pandas spark udf

9、数据集获取

UCI机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类和推荐系统任务。数据集列表位于:http://archive.ics.uci.edu/ml/

Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问。这些数据集包括人类基因组项目、Common Craw网页语料库、维基百科数据和Google Books Ngrams。相关信息可参见:http://aws.amazon.com/publicdatasets/

Kaggle:这里集合了Kaggle举行的各种机器学习竞赛所用的数据集。它们覆盖分类、回归、排名、推荐系统以及图像分析领域,可从

Competitions区域下载:http://www.kaggle.com/competitions

KDnuggets:这里包含一个详细的公开数据集列表,其中一些上面提到过的。该列表位于:http://www.kdnuggets.com/datasets/index.html

10、数仓分层概念

参考资料:(10条消息) 数据仓库–数据分层(ETL、ODS、DW、APP、DIM)_hello_java_lcl的博客-CSDN博客_dim层

sparkSql数据离线处理--整理记录

五、实战复盘

1、2022/1/3

题目:

sparkSql数据离线处理--整理记录

数据源:

csv文件(未修改)

sparkSql数据离线处理--整理记录

mysql表格(增加脏数据)

sparkSql数据离线处理--整理记录

环境准备:

1.mysql数据表格 2.hive目标表 3.pom文件

完成速度:

3h+

遇到问题:
1.data->mysql,数据保存

SaveMode.Overwrite 保存至mysql数据库,不仅会覆盖数据格式,字段名也会被覆盖

在做题途中遇到了保存SaveMode.Append失败的错误,修改为Overwrite 不报错,原因不明

是否解决:

出现错误

Unknown column 'sum' in 'field list'

原因是字段名与mysql数据库目标表中的字段名不同

修改字段名相同即可

.withColumnRenamed("sum","total_price")

在hive中是否有相同特性?

2.Join等表连接的使用

Join,union仍不熟悉 select子查询也比较生疏

是否解决: ✔?

join理解下图足够

sparkSql数据离线处理--整理记录

union联合要求字段相同 否则报错

3.Date计算

参考资料:https://blog.csdn.net/wybshyy/article/details/52064337

sparkSql数据离线处理--整理记录

使用datediff不需要转换时间格式

是否解决:

参考资料:

(18条消息) Spark-SQL常用内置日期时间函数_绿萝蔓蔓绕枝生-CSDN博客_sparksql 时间函数

(18条消息) sparksql 时间函数_OH LEI``-CSDN博客_sparksql时间函数

datediff 计算两个时间差天数 结果返回一个整数

对时间格式可能有要求例如‘2021/1/4‘这样的时间格式无法被计算(sql中,算子貌似没有这个问题)

sql写法:

spark.sql(
  """
    |select datediff('2021-1-4','2020-12-30')
    |""".stripMargin).show()

算子写法:

.withColumn("o",datediff(col("delivery_date"),col("order_date")))

months_between计算两个时间差月数 结果返回一个浮点数

sql写法:

spark.sql(
  """
    |select months_between('2021-1-4','2020-12-30')
    |""".stripMargin).show()

返回:0.16129032

若想返回整数月份可以将天数删除:

spark.sql(
  """
    |select months_between('2021-1','2020-12')
    |""".stripMargin).show()

返回:1.0

算子写法:

.withColumn("o",months_between(col("delivery_date"),col("order_date")))

直接用时间戳相减通过计算也可以

spark.sql(
  """
    |select (unix_timestamp('2022/1/1','yyyy/MM/dd') - unix_timestamp('2021/12/31','yyyy/MM/dd'))/60/60/24
    |""".stripMargin).show()

2022-4-20补充:
第三部分数据源:
链接:https://pan.baidu.com/s/1U7BF0eDC56ea3XfcqejftA 提取码:zzzz
–来自百度网盘超级会员V5的分享
拿完点赞支持一下~文章来源地址https://www.toymoban.com/news/detail-406027.html

到了这里,关于sparkSql数据离线处理--整理记录的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 全国职业院校技能大赛-大数据 离线数据处理模块-数据清洗

    子任务2:数据清洗         编写Hive SQL代码,将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。 抽取

    2024年02月02日
    浏览(49)
  • 全国职业院校技能大赛-大数据 离线数据处理模块-指标计算

    赛题来源2023年全国职业院校技能大赛赛题第1套任务B中指标计算模块 编写Scala代码,使用Spark计算相关指标。 注:在指标计算中,不考虑订单信息表中order_status字段的值,将所有订单视为有效订单。计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表

    2024年02月01日
    浏览(50)
  • 数据处理方法整理【目前最全】

    主要采用编写数据处理代码来对数据进行处理步骤,主要采用Python语言,使用相关的Pandas、Numpy、Scikit-learn等库来进行操作,一些代码实例如下。 1、缺失数据处理 数据缺失是指数据集中某行记录或某列特征的变量值存在空值的情况。常用的缺失值处理方法主要包括以下几种

    2024年02月03日
    浏览(34)
  • 数据预处理方法整理(数学建模)

    这篇文章主要是整理了一些作者在各种建模比赛中遇到的数据预处理问题以及方法,主要针对excel或csv格式的数据,为后续进行机器学习或深度学习做前期准备 导入库和文件,这里使用的是绝对路径,可改为相对路径 传入的为csv格式的文件,如果是xlsx格式的文件,建议先使

    2024年02月14日
    浏览(53)
  • excel数据的编排与整理——行列的批量处理

    1.1 插入连续行 1.1.0 题目内容 1.1.1 选中插入的位置➡按住shift键➡往下选中2行 1.1.2 鼠标右击➡点击插入 1.1.3 插入后的效果 1.2 插入不连续行 1.2.0 题目内容 1.2.1 按下ctrl键➡选中插入的位置,需要插入多行时,需要按下shift键➡再往下选中1行 1.2.2 鼠标右击➡点击插入 1.2.3 插入后

    2024年02月10日
    浏览(35)
  • 【SparkSQL】基础入门(重点:SparkSQL和Hive的异同、SparkSQL数据抽象)

    【大家好,我是爱干饭的猿,本文重点介绍Spark SQL的定义、特点、发展历史、与hive的区别、数据抽象、SparkSession对象。 后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】 上一篇文章:《【Spark入门】基础入门》 SparkSQL 是Spark的一个模块, 用

    2024年02月21日
    浏览(33)
  • 【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户

    🚀 作者 :“大数据小禅” 🚀 文章简介 :本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容 🚀 内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪 SparkSQL简介 Spark

    2023年04月15日
    浏览(43)
  • 开源(离线)中文语音识别ASR(语音转文本)工具整理

    开源(离线)中文语音识别ASR(语音转文本)工具整理 Open AI在2022年9月21日开源了号称其英文语音辨识能力已达到人类水准的Whisper神经网络,且它亦支持其它98种语言的自动语音辨识。 Whisper系统所提供的自动语音辨识(Automatic Speech Recognition,ASR)模型是被训练来运行语音辨识与翻

    2024年02月13日
    浏览(65)
  • 图像处理及深度学习开源数据集大全(四万字呕心沥血整理)

    本文整理了150 余个深度学习和图像处理领域的开源数据集,包括:目标检测、人脸识别、文本识别、图像分类、缺陷检测、医学影像、图像分割、图像去雾、关键点检测、动作识别、姿态估计、自动驾驶、RGBT共13个方向。 T-LESS数据集 类型:目标检测 数量:39000 数据集下载地

    2024年02月03日
    浏览(69)
  • 大数据开发之SparkSQL

    1、spark sql是spark用于结构化数据处理的spark模块 1)半结构化数据(日志数据) 2)结构化数据(数据库数据) hive on spark:hive既作为存储元数据又负责sql的解析优化,语法是hql语法,执行引擎编程了spark,spark负责采用rdd执行。 spark on hive:hive只作为存储元数据,spark负责sql解

    2024年01月25日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包