Spark 增量抽取 Mysql To Hive

这篇具有很好参考价值的文章主要介绍了Spark 增量抽取 Mysql To Hive。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

题目要求:

  1. 抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.user_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.customer_inf命令;

代码实现: 

package org.example

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import java.time.LocalDate


object Demo {
  def main(args: Array[String]): Unit = {
//    创建spark
    val conf = new SparkConf().setMaster("local[*]").setAppName("one")
      .set("spark.testing.memory", "2147480000").set("dfs.client.use.datanode.hostname", "true")
    System.setProperty("HADOOP_USER_NAME", "root")

// 连接hive
val spark = SparkSession.builder()
  // 配置 Hive Metastore 的连接地址
  .config("hive.metastore.uris", "thrift://192.168.23.60:9083")
  // 配置 Hive 数据仓库的存储位置
  .config("hive.metastore.warehouse", "hdfs://192.168.23.60://9000/user/hive/warehouse")
  // 配置 Spark SQL 的存储分配策略为 "LEGACY"
  .config("spark.sql.storeAssignmentPolicy", "LEGACY")
  // 添加其他自定义的 Spark 配置
  .config(conf)
  // 启用对 Hive 的支持,使得可以使用 Hive 的表和查询
  .enableHiveSupport()
  // 创建 SparkSession 对象
  .getOrCreate()


//连接mysql
    spark.read.format("jdbc")
      .option("url","jdbc:mysql://192.168.23.60:3306/ds_db01??characterEncoding=UTF-8")
      .option("driver","com.mysql.jdbc.Driver")
      .option("user","root")
      .option("password","123456")
      .option("dbtable","customer_inf")
      .load().createOrReplaceTempView("v")  //对该表创建视图
    spark.sql("select * from v")


//    获取当天时间的前一天
val unit = java.time.LocalDate.now().plusYears(-1).plusMonths(-1).plusDays(-1).toString().replace("-", "")
    val unit1 = unit.toInt
//全量抽取
//    spark.sql(
//      s"""
//         |insert overwrite table gh_test.customer_inf
//         |partition (etl_date="${unit}")
//         |select * from v
//         |
//         |""".stripMargin).show()
//
//spark.sql("select * from gh_test.customer_inf").show

//将modified_time类型转换为yyyyMMdd
    spark.sql(
        s"""
         |select  customer_inf_id,customer_id,customer_name,identity_card_type,identity_card_no,mobile_phone,

         |customer_email,gender,customer_point,register_time,birthday,customer_level,customer_money,
         |from_unixtime(unix_timestamp(modified_time,'yyyy-MM-dd'),'yyyyMMdd') as modified_time
         |from v
         |""".stripMargin).createOrReplaceTempView("v1")
//      spark.sql("select  count(*) from gh_test.customer_inf").show

//从mysql中增量抽取到hive
    spark.sql(
      s"""
         |insert overwrite table gh_test.customer_inf
         |partition (etl_date="${unit}")
         |select * from v where  modified_time>"${unit1}"
         |""".stripMargin
    ).show()


//  spark.sql("select * from gh_test.customer_inf").show

//    查询抽取后的条数
    spark.sql("select  count(*) from gh_test.customer_inf").show
//    spark.sql("desc gh_test.customer_inf")


  }
}

文章来源地址https://www.toymoban.com/news/detail-752171.html

到了这里,关于Spark 增量抽取 Mysql To Hive的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 编写 Scala 工程代码,将 MySQL 库中的表增量抽取到 Hive库中对应表中

    提示:本文采用IDEA编写代码 搭建好spark,推荐一篇较好的文章:spark3.3.0安装部署过程。 注:如果需要运行 yarn 模式,在 spark-env.sh 文件末尾添加: export YARN_CONF_DIR=/opt/hadoop-3.1.3/etc/hadoop export HADOOP_CONF_DIR=/opt/hadoop-3.1.3/etc/hadoop 搭建完spark 集群 后,在spark的 jars 目录下放入mys

    2023年04月14日
    浏览(42)
  • Spark读写MySQL数据库

    一、读取数据库 (一)通过RDD的方式读取MySQL数据库 四要素:驱动、连接地址、账号密码 (二)通过DataFrame的方式读取MySQL数据库 二、添加数据到MySQL (一)通过RDD的方式插入数据到MySQL 每个分区执行一次创建连接和关闭连接 (二)通过RDD的方式插入数据到MySQL 2 每个分区

    2024年04月23日
    浏览(21)
  • 将spark的数据保存到MySQL

    我们用spark对数据进行分析和提取数据后要对得到的数据进行保存接下来的内容是将数据保存到MySQL数据库中 (本小博主已经为看官大人准备好了下载地址点击下载即可) 下载地址 下载完成后我们对这个压缩包进行解压(当然不解压直接给他拽出来也不犯毛病) 就是下面画

    2024年02月10日
    浏览(27)
  • Hudi Spark-SQL增量查询数据几种方式

    由于项目上主要用Hive查询Hudi,所以之前总结过一篇:Hive增量查询Hudi表。最近可能会有Spark SQL增量查询Hudi表的需求,并且我发现目前用纯Spark SQL的形式还不能直接增量查询Hudi表,于是进行学习总结一下。 先看一下官方文档上Spark SQL增量查询的方式,地址:https://hudi.apache.or

    2024年02月11日
    浏览(30)
  • 【大数据技术】Spark-SQL如何连接MySQL并进行读写数据

    1、配置连接MySQL的驱动 根据自己安装的MySQL的版本,找到正确的驱动文件,我的是MySQL8.0.19 所以驱动文件是mysql-connector-java-8.0.19.jar,将其复制到spark目录下的jars文件夹下 my@ubuntu:/usr/local/spark245_h_local/jars$ cp /home/my/myfolder/mysql-connector-java-8.0.19/mysql-connector-java-8.0.19.jar . 2、spark

    2024年02月08日
    浏览(28)
  • 构建大数据环境:Hadoop、MySQL、Hive、Scala和Spark的安装与配置

    在当今的数据驱动时代,构建一个强大的大数据环境对于企业和组织来说至关重要。本文将介绍如何安装和配置Hadoop、MySQL、Hive、Scala和Spark,以搭建一个完整的大数据环境。 安装Hadoop 首先,从Apache Hadoop的官方网站下载所需的Hadoop发行版。选择适合你系统的二进制发行版,下

    2024年02月11日
    浏览(41)
  • MySQL与Spark集成实践

    在大数据时代,数据的处理和分析已经成为企业的核心竞争力。MySQL作为最流行的关系型数据库之一,被广泛应用于各种业务场景中。而Apache Spark则是一个大规模数据处理的统一分析引擎,它提供了一种简单、通用的方式来处理大规模数据。本文将介绍如何将MySQL与Spark集成,

    2024年02月21日
    浏览(74)
  • Spark 写 MySQL经典50题

    目录 建表 添加数据 表结构分析图 连接数据库 题目 1、查询\\\"01\\\"课程比\\\"02\\\"课程成绩高的学生的信息及课程分数 2、查询\\\"01\\\"课程比\\\"02\\\"课程成绩低的学生的信息及课程分数 3、查询平均成绩大于等于60分的同学的学生编号和学生姓名和平均成绩 4、查询平均成绩小于60分的同学的学

    2023年04月13日
    浏览(25)
  • 【Spark+Hadoop+Hive+MySQL+Presto+SpringBoot+Echarts】基于大数据技术的用户日志数据分析及可视化平台搭建项目

    点我获取项目数据集及代码 随着我国科学技术水平的不断发展,计算机网络技术的广泛应用,我国已经步入了大数据时代。在大数据背景下,各种繁杂的数据层出不穷,一时难以掌握其基本特征及一般规律,这也给企业的运营数据分析工作增添了不小的难度。在大数据的背景

    2024年02月10日
    浏览(44)
  • 大数据平台组件日常运维操作说明(Hadoop/Zookeeper/Kafa/ES/Mysql/Spark/Flume/Logstash/Tomcat)

    hdfs 生产环境hadoop为30台服务器组成的集群,统一安装配置,版本号为2.7.7 部署路径:/opt/hadoop 启动用户:hadoop 配置文件: /opt/hadoop/config/hdfs-site.xml /opt/hadoop/config/core-site.xml hadoopy运行环境变量配置文件: hadoop-env.sh journalnode.env datanode.env namenode.env hadoop系统服务配置文件: z

    2024年02月03日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包