目录
前言
题目:
一、读题分析
二、处理过程
1.采用SparkSQL使用max函数找到最大的日期然后转换成时间类型在变成字符串
2.这里提供除了SQL方法外的另一种过滤不满足条件的方法
三、重难点分析
总结
前言
本题来源于全国职业技能大赛之大数据技术赛项电商赛题-离线数据处理-抽取
题目:
提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写)
一、读题分析
涉及组件:MYSQL,HIVE,SCALA,SPARK
涉及知识点:
- Spark读取数据库数据
- DataFrameAPI的使用(重点)
- Spark写入数据库数据
- Hive数据库的基本操作
- 增量数据的概念(思考:与全量数据有什么区别?)
二、处理过程
与全量数据类似,唯一不同的点在于抽取增量的数据只是在全量数据中的一部分(形象来说)。个人认为,这样在实际应用中,抽取增量数据比抽取全量数据,更节省时间,带宽,硬件处理频率。总来说,抽取增量数据而不是全量数据的目的就是减少资源的浪费。
1.采用SparkSQL使用max函数找到最大的日期然后转换成时间类型在变成字符串
import org.apache.spark.sql.SparkSession
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
object MysqlToHive {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.lit
val spark =SparkSession.builder().appName("mysqltoHive").master("spark://bigdata1:7077").enableHiveSupport().getOrCreate()
// 读取mysql的配置
val jdbcurl = "jdbc:mysql://bigdata1:3306/db"
val tablename = "table1"
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
properties.setProperty("driver", "com.mysql.jdbc.Driver")
// 读取mysql数据创建dataframe
val mysqlDF = spark.read.jdbc(jdbcurl, tablename, properties)
mysqlDF.createOrReplaceTempView("mysqldata")
// 读取hive数据ods库中最大的时间
spark.sql("use ods")
val hiveDF = spark.read.table("ods.table1")
hiveDF.createOrReplaceTempView("hivedata")
// 获取最大值
val maxValue = spark.sql("select max(modified_time) from hivedata").head().getTimestamp(0).toString
println("Hive最大的时间为:" + maxModifiedTime)
// 3. 使用Spark SQL查询获取customer_inf表中modified_time的最大值。
// 4. 使用head()方法获取结果集中的第一行数据。
// 5. 使用getTimestamp(0)方法获取第一列数据的Timestamp类型值。
// 6. 使用toString()方法将Timestamp类型值转换为字符串类型。
// 7. 打印最大修改时间的字符串值。
// 找到增量数据
val resultDF = spark.sql(s"select * from mysqldata where momdified_time > '$maxValue'")
// 取得昨天的日期
// 法1:
val sdf = new SimpleDateFormat("yyyyMMdd")
val str = sdf.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)
// 法2:
val str = new SimpleDateFormat("yyyyMMdd").format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)
val reDF = resultDF.withColumn("etl_date", lit(str))
reDF.write.mode(SaveMode.Append).partitionBy("etl_date").saveAsTable("ods.table1")
}
}
2.这里提供除了SQL方法外的另一种过滤不满足条件的方法
// 这里可以写死来模拟增量数据
// val givenTime = "2022-08-23 00:00:00"
val maxValue = spark.sql("select max(modified_time) from hivedata").head().getTimestamp(0).toString
// gt获取比givenTime时间大的数据
// lt小于
val dataf = df.filter(col("modified_time").lt(max)).toDF()
三、重难点分析
- 增量数据与全量数据的不同
- SparkSQL函数的使用
- 解决增量数据的方法
总结
什么是全量数据、增量数据?
全量数据和增量数据是在数据库系统迁移时的概念。
1.全量数据:
当前需要迁移的数据库系统的全部数据。
2.增量数据:
在数据库系统迁移过程中,对比原数据,新产生的数据即为增量数据。文章来源:https://www.toymoban.com/news/detail-717798.html
原创作品如需引用请标明出处文章来源地址https://www.toymoban.com/news/detail-717798.html
到了这里,关于大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!