大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)

这篇具有很好参考价值的文章主要介绍了大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

前言

题目:

一、读题分析

二、处理过程

1.采用SparkSQL使用max函数找到最大的日期然后转换成时间类型在变成字符串

2.这里提供除了SQL方法外的另一种过滤不满足条件的方法

三、重难点分析

总结 


前言

本题来源于全国职业技能大赛之大数据技术赛项电商赛题-离线数据处理-抽取

题目:

spark抽取mysql数据,大数据技术,大数据,hive,spark,mysql,scala


提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写) 

一、读题分析

涉及组件:MYSQL,HIVE,SCALA,SPARK

涉及知识点:

  1. Spark读取数据库数据
  2. DataFrameAPI的使用(重点)
  3. Spark写入数据库数据
  4. Hive数据库的基本操作
  5. 增量数据的概念(思考:与全量数据有什么区别?)

二、处理过程

  与全量数据类似,唯一不同的点在于抽取增量的数据只是在全量数据中的一部分(形象来说)。个人认为,这样在实际应用中,抽取增量数据比抽取全量数据,更节省时间,带宽,硬件处理频率。总来说,抽取增量数据而不是全量数据的目的就是减少资源的浪费。

1.采用SparkSQL使用max函数找到最大的日期然后转换成时间类型在变成字符串


import org.apache.spark.sql.SparkSession
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}

object MysqlToHive {
  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions.lit

    val spark =SparkSession.builder().appName("mysqltoHive").master("spark://bigdata1:7077").enableHiveSupport().getOrCreate()

    //    读取mysql的配置
    val jdbcurl = "jdbc:mysql://bigdata1:3306/db"
    val tablename = "table1"
    val properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "123456")
    properties.setProperty("driver", "com.mysql.jdbc.Driver")

    //    读取mysql数据创建dataframe
    val mysqlDF = spark.read.jdbc(jdbcurl, tablename, properties)
    mysqlDF.createOrReplaceTempView("mysqldata")

    //    读取hive数据ods库中最大的时间
    spark.sql("use ods")
    val hiveDF = spark.read.table("ods.table1")
    hiveDF.createOrReplaceTempView("hivedata")

    //    获取最大值
    val maxValue = spark.sql("select max(modified_time) from hivedata").head().getTimestamp(0).toString

     println("Hive最大的时间为:" + maxModifiedTime)
    //    3. 使用Spark SQL查询获取customer_inf表中modified_time的最大值。
    //    4. 使用head()方法获取结果集中的第一行数据。
    //    5. 使用getTimestamp(0)方法获取第一列数据的Timestamp类型值。
    //    6. 使用toString()方法将Timestamp类型值转换为字符串类型。
    //    7. 打印最大修改时间的字符串值。
    //    找到增量数据

    val resultDF = spark.sql(s"select * from mysqldata where momdified_time > '$maxValue'")

    //    取得昨天的日期
    //    法1:
    val sdf = new SimpleDateFormat("yyyyMMdd")
    val str = sdf.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)

    //    法2:
    val str = new SimpleDateFormat("yyyyMMdd").format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)

    val reDF = resultDF.withColumn("etl_date", lit(str))

    reDF.write.mode(SaveMode.Append).partitionBy("etl_date").saveAsTable("ods.table1")

  }
}

2.这里提供除了SQL方法外的另一种过滤不满足条件的方法

    //    这里可以写死来模拟增量数据
    //    val givenTime = "2022-08-23 00:00:00"
    val maxValue = spark.sql("select max(modified_time) from hivedata").head().getTimestamp(0).toString
    //    gt获取比givenTime时间大的数据
    //    lt小于
    val dataf = df.filter(col("modified_time").lt(max)).toDF()

三、重难点分析

  1. 增量数据与全量数据的不同
  2. SparkSQL函数的使用
  3. 解决增量数据的方法

总结 

什么是全量数据、增量数据?

全量数据和增量数据是在数据库系统迁移时的概念。

1.全量数据:

        当前需要迁移的数据库系统的全部数据。

2.增量数据:

        在数据库系统迁移过程中,对比原数据,新产生的数据即为增量数据。

原创作品如需引用请标明出处文章来源地址https://www.toymoban.com/news/detail-717798.html

到了这里,关于大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用sqoop将hive数据库导入至mysql

    前言:本文由实现此博客过程中遇到的问题及解决办法整理而成。博客:淘宝双11数据分析与预测课程案例-步骤三:将数据从Hive导入到MySQL_厦大数据库实验室 数据 数据放在 /usr/local/data/comment.csv 数据来源于:Amazon Reviews: Unlocked Mobile Phones | Kaggle 大数据开发环境 软件 版本 had

    2024年02月05日
    浏览(34)
  • Spark、RDD、Hive 、Hadoop-Hive 和传统关系型数据库区别

    Hive Hadoop Hive 和传统关系型数据库区别 Spark 概念 基于内存的分布式计算框架 只负责算 不负责存 spark 在离线计算 功能上 类似于mapreduce的作用 MapReduce的缺点 运行速度慢 (没有充分利用内存) 接口比较简单,仅支持Map Reduce 功能比较单一 只能做离线计算 Spark优势 运行速度快

    2024年02月13日
    浏览(37)
  • 【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

    【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive 本需求将模拟从MySQL中向Hive数仓中导入数据,数据以时间分区。测试两种导入场景,一种是将数据全量导入,即包含所有时间分区;另一种是每天运行调度,仅导入当天时间分区中的用户数据。 mysql表建表语句:

    2024年02月03日
    浏览(34)
  • 处理大数据的基础架构,OLTP和OLAP的区别,数据库与Hadoop、Spark、Hive和Flink大数据技术

    2022找工作是学历、能力和运气的超强结合体,遇到寒冬,大厂不招人,可能很多算法学生都得去找开发,测开 测开的话,你就得学数据库,sql,oracle,尤其sql要学,当然,像很多金融企业、安全机构啥的,他们必须要用oracle数据库 这oracle比sql安全,强大多了,所以你需要学

    2024年02月08日
    浏览(47)
  • Spark读写MySQL数据库

    一、读取数据库 (一)通过RDD的方式读取MySQL数据库 四要素:驱动、连接地址、账号密码 (二)通过DataFrame的方式读取MySQL数据库 二、添加数据到MySQL (一)通过RDD的方式插入数据到MySQL 每个分区执行一次创建连接和关闭连接 (二)通过RDD的方式插入数据到MySQL 2 每个分区

    2024年04月23日
    浏览(21)
  • 大数据平台环境搭建---- Hive&MySql数据库组件配置

    前置环境 Hadoop集群必须部署完成,如果还没有搭建请先前往Hadoop全分布搭建笔记 程序版本 hive 1.1.0 mysql 5.7.25 mysql-connector-java-5.1.39-bin.jar 资源下载 官网下载: mysql-5.7.25-1.el7.x86_64.rpm-bundle.tar :https://downloads.mysql.com/archives/community/  链接:https://pan.xunlei.com/s/VNoQg4wdxda5by6L8Lvug9e

    2024年01月25日
    浏览(52)
  • 分布式数据库·Hive和MySQL的安装与配置

    一、版本要求:Hadoop:hadoop-2.10.1、MySQL:mysql-8.0.35、 HIVE :apache-hive-3.1.2、MySQL驱动:mysql-connector-java-5.1.49 安装包网盘链接:阿里云盘分享 安装位置  Hive:master、MySQL:slave1 二、卸载已安装的MySQL(如果不符合需求) 1.关闭MySQL服务 2.Yum检查 3.安装则直接删除 4.rpm检查 5.如果存在则删

    2024年02月03日
    浏览(47)
  • 【数据库】数据库多种锁模式,共享锁、排它锁,更新锁,增量锁,死锁消除与性能优化

    ​ 专栏内容 : 手写数据库toadb 本专栏主要介绍如何从零开发,开发的步骤,以及开发过程中的涉及的原理,遇到的问题等,让大家能跟上并且可以一起开发,让每个需要的人成为参与者。 本专栏会定期更新,对应的代码也会定期更新,每个阶段的代码会打上tag,方便阶段学

    2024年02月04日
    浏览(27)
  • 实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

    目录 前言: 1、springboot引入依赖: 2、yml配置文件 3、创建SQL server CDC变更数据监听器 4、反序列化数据,转为变更JSON对象 5、CDC 数据实体类 6、自定义ApplicationContextUtil 7、自定义sink 交由spring管理,处理变更数据         我的场景是从SQL Server数据库获取指定表的增量数据,查

    2024年02月10日
    浏览(79)
  • 4、hive的使用示例详解-事务表、视图、物化视图、DDL(数据库、表以及分区)管理详细操作

    1、apache-hive-3.1.2简介及部署(三种部署方式-内嵌模式、本地模式和远程模式)及验证详解 2、hive相关概念详解–架构、读写文件机制、数据存储 3、hive的使用示例详解-建表、数据类型详解、内部外部表、分区表、分桶表 4、hive的使用示例详解-事务表、视图、物化视图、DDL

    2024年02月09日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包