spark读取数据写入hive数据表

这篇具有很好参考价值的文章主要介绍了spark读取数据写入hive数据表。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

spark 读取数据

spark从某hive表选取数据写入另一个表的一个模板 概述:

create_tabel建表函数,定义日期分区

删除原有分区drop_partition函数

generate_data 数据处理函数,将相关数据写入定义的表中 

注: 关于 insert overwrite/into 中partition时容易出的分区报错问题: 

添加分区函数add_partition

部分经验:多用传变量的形式; select最后的逗号要去掉; 


spark 读取数据

除了可以用spark.sql(sql语句如select...)外,对于不同的格式的数据也有相应的接口api ,得到spark dataframe的数据。 

如果csv : spark.read.csv('path') 或者   spark.createDataFrame(pandas.read_csv('path'))

如果是json : spark.read.json(path); 

如果是parquet :  spark.read.parquet(path); 

如果是text: spark.read.text(path) ; 

 注: 上面的spark 是pyspark 建的一个spark session 。 相关配置可以忽略,一般都是复用公司固定的。 

spark = SparkSession.builder.appName(VERSION).config(
   'spark.yarn.queue', '具体的队列名'  ## 这只是队列名-可忽略
).config(
    'spark.jars', 'lib/spark-tensorflow-connector_2.11-1.10.0.jar'
).config(
    'spark.dynamicAllocation.maxExecutors', '1000'
).config(
    'spark.executor.cores', '2'
).config(
    'spark.sql.autoBroadcastJoinThreshold', '-1'
).config("dfs.client.use.datanode.hostname", "true"
).config('spark.default.parallelism', '100'
).config('spark.yarn.executor.memoryOverhead', '4096'
).config('spark.core.connection.ack.wait.timeout', '300'
).config('spark.network.timeout', '120s'
).config('spark.executor.memory', '8G'
).config('spark.sql.merge.enabled', 'true'
).config('spark.driver.memory', '6G'       
).enableHiveSupport().getOrCreate()

spark从某hive表选取数据写入另一个表的一个模板 概述:

table_name = 'name'  # 要写入的目标表

date  = '2023-01-21'  # 取数据的日期

create_tabel(table_name) # 建表函数,表结构(要写入的数据表) ,建表时注意常用日期来分区

drop_partition(spark, table_name)  #删除原有函数, 如果原来有相关分区数据则进行删除 

generate_data(date, table_name)  # 读取数据函数并写入目标表 

add_partition(spark, table_name)  # 调整写入的分区 ,完成

create_tabel建表函数,定义日期分区

def create_tabel(table_name) : 

        create_table_sql = """ 

        CREATE TABLE IF NOT EXISTS  DB_NAME.{table_name} (

        column1  数据类型,

        dt_test ,string , .....

        count ,float 

        ) PARTITIONED BY ( year string,

                                            month string,

                                            day string

        )  STORED AS ORC 

        LOCATION  'DB_PATH/{table_name}'

        """.format(table_name = table_name) 

  spark.sql(create_table_sql)   # 执行建表语句。注意上面的分区形式。 
 

删除原有分区drop_partition函数

 def drop_partition(spark, table_name):

        alter_table_sql = '''

                ALTER TABLE ${DB_NAME}.{table_name} DROP IF EXISTS

                PARTITION (year = '{year}',month = '{month}', day = '{day}')

                '''.format(year = date[:4], month = date[5:7], day = date[8:10], table_name = table_name)

        spark.sql(alter_table_sql)

generate_data 数据处理函数,将相关数据写入定义的表中 

def generate_data(date, table_name): 

        get_data_part = spark.sql("""

        select 

                concat_ws('-', year, month, day) as dt_test ,

                count ,...更多数据列  (可以是经过sql处理解析后后得到的数据列,如json可以直接字典解析param['city'] as city_id 

        from  已有old表A 

        where 

                各种限制条件或者过滤条件 ,如时间限制 concat_ws('-', year, month, day) = '{date}'

        """.format(date= date )

).cache() # 缓存数据到内存,后期数据不用再反复执行,减少耗时

## 可以对Dataframe get_data_part再进行各种处理得到spark dataframe get_data_part_final  。

        aim_columns = [ 'col1', 'col2' , 'col3'...]  # aim_columns 传变量的形式,这里的数据column 一定要和建表时的数据一致,写入前select 后直接写入,以这种写法这样便可以减少对写入数据时列名对应不上等报错问题。另外要注意最后处理的得到的各列数据类型也一致。 

## 写入数据

        file_path = 'DB_PATH/{table_name}/{year}/{moth}/{day}'.format(year = date[:4],month=date[5:7],day=date[8:10],tabel_name= table_name) # 定义路径

#方式1 : 直接用spark  Dataframe 的write来写入。 

        get_data_part_final.select(aim_columns).coalesce(5).write.orc(path=file_path,mode='overwrite')   

上面写入语句中: .coalesce(5)是将数据文件写为指定个数5,这样可以减少数据倾斜现象,和.repartition(5)的功能无差别,这样就能控制保存的文件数量。 mode= 'overwrite'会覆盖之前的数据,如果将overwrite改为'append'会追加到表中。 

关于数据倾斜和小文件过多,可以见自己的另一个总结:http://t.csdn.cn/i7nv0

# 方式2  可以用spark sql 的方式来写

        get_data_part_final.createOrReplaceTempView("test_temp")

        spark.sql("""insert overwrite table db_name.{table_name}  partition(date={date}) select aim_columns  from test_temp""".format(table_name= table_name,date=date )

上面insert overwrite会重写数据,既先进行删除,再写入,对应方式1 的mode为overwrite 的形式。如果是追加写入表中(方式1重mode为append),则此时应该改为insert into直接追加到表中数据的尾部。

insert into 和overwrite

spark.sql中 注意 insert into 后的 table 标识可写可不写,但是insert overwrite 不行!

INSERT INTO [TABLE] [db_name.]table_name

INSERT OVERWRITE TABLE [db_name.]table_name

另外: ''' insert into tableNew [ partition(...)] select 字段 from 已有表''' 时,注意select 后的字段要确认和tableNew 中的表字段的先后顺序对应的上!不要随意用 select * 这个习惯不好! 另外注意字段的数量是否一致。

注: 关于 insert overwrite/into 中partition时容易出的分区报错问题: 

关于partition :

#  关于 partition的分区,如果是想以指定的某一天的日期进行分区,在insert into 语句 的partition里直接指定 分区的取值,如(year='2022'),插入的数据的分区则为指定取值的分区,此时下面的select语句中不用包括year,month,day字段了。注意此时 因为 insert 中的partition后已经指定了字段值,如果select 中再加入这几个字段就会显示字段值不匹配,会多了而报错,指定字段的值如下.如果想根据 选出的字段中的值进行分区,则 partition()里加入字段即可,注意select中要包括partition括号的字段,如 insert into tableA partition(year, month,day) select ... year,month,day ....

此处参考:pyspark--写hive分区表覆盖指定分区数据

一文搞定hive之insert into 和 insert overwrite与数据分区

添加分区函数add_partition

def add_partition(spark, table_name):

        alter_table_sql = '''    

                ALTER TABLE DB_NAME.{table_name} ADD IF NOT EXISTS    

                PARTITION (year = '{year}',month = '{month}', day = '{day}')    

                LOCATION 'DB_PATH/{table_name}/{year}/{month}/{day}'    

                '''.format(year = date[:4], month = date[5:7], day = date[8:10], table_name = table_name)             

        spark.sql(alter_table_sql)

这个如果在原来generate_data写入数据函数中有进行分区,其实可以不用再调用。 

部分经验:多用传变量的形式; select最后的逗号要去掉; 

之前自己遇到过的一些低级错误记录下, 

  1. 多处用到的名字尽量用变量传入进去到sql语句部分。前后用的view 和 register 相语句中tempTable 表名要一致! 多处用到的名字尽量用变量传入进去到sql语句部分,这样一个地方定义了就不会因为多次更改而容易出错,减少犯错的概率,养成良好的代码习惯。 在pyspark中形式如 spark.sql( ```sql 语句 中涉及变量为{var1}`,注意带着{}``.format(var1=值) ,如果在函数中的尽量用函数参数变量给值,这样当变量值涉及多个函数用到时方便同意给值。
  2. 括号要对应上,select 语句中的最后一个字段不能有逗号!有时候日志不一定能报错出来!要注意检查。通常报错 “  org.apache.spark.sql.AnalysisException: cannot resolve '`reward`(第一个字段)' given input columns: [].... 'InsertIntoTable 'UnresolvedRelation `db.table`, Map(字段)...False”时要注意去看是否字段不对应,是否最后一个字段多了多余的分割符号等!

参考: 

Hive/Spark小文件解决方案(企业级实战) - 腾讯云开发者社区-腾讯云

PySaprk 将 DataFrame 数据保存为 Hive 分区表 | XinanCSD.github.io文章来源地址https://www.toymoban.com/news/detail-803398.html

到了这里,关于spark读取数据写入hive数据表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • python 读取文件夹下子文件夹下的csv数据表

    可以使用 Python 中的  os  和  csv  模块来读取文件夹下的文件夹的 csv 数据。具体的步骤如下: 导入  os  和  csv  模块。 使用  os  模块的  listdir  函数列举出目标文件夹下的所有子文件夹的名称。 对于每个子文件夹,列举出其中的所有 .csv 文件的名称,并逐一读取 csv

    2024年02月11日
    浏览(72)
  • 数据库实验2 创建数据表修改数据表和删除数据表

    实验2 创建数据表修改数据表和删除数据表 实验类型: ●验证性实验  ○综合性实验  ○设计性实验 实验目的:      (1)了解数据表的结构特点。      (2)掌握表中列定义时所用到的各种数据类型。      (3)学会使用企业管理器(即MSSMS-----Microsoft SQL Server Manag

    2024年02月07日
    浏览(72)
  • 构建大数据环境:Hadoop、MySQL、Hive、Scala和Spark的安装与配置

    在当今的数据驱动时代,构建一个强大的大数据环境对于企业和组织来说至关重要。本文将介绍如何安装和配置Hadoop、MySQL、Hive、Scala和Spark,以搭建一个完整的大数据环境。 安装Hadoop 首先,从Apache Hadoop的官方网站下载所需的Hadoop发行版。选择适合你系统的二进制发行版,下

    2024年02月11日
    浏览(50)
  • oracle数据表转换为mysql数据表

    oracle数据表转换为mysql数据表,或者反过来,我们可以借助navica的工具 1.打开navicat的工具-数据传输 2.选择源数据库以及目标数据库 目标可以选择数据库也可以选择文件,目标数据库需要提前建好表,这里是选择文件,注意选择一个文件,sql格式即为目标数据库类型,这里不

    2024年02月16日
    浏览(60)
  • MySql基础教程(三):创建数据表、数据增删改查、删除数据表

    创建MySQL数据表需要以下信息: 表名 表字段名 定义每个表字段 1.1 语法 下面是创建MySQL数据表的SQL通用语法: 以下例子在 nobug 数据库中创建数据表 nobug_user : 实例解析: 如果你不想字段为 NULL 可以设置字段的属性为 NOT NULL, 在操作数据库时如果输入该字段的数据为NULL ,

    2024年02月11日
    浏览(64)
  • Spark连接Hive读取数据

            Ubuntu 16.04 LTS         ubuntu-16.04.6-desktop-i386.iso          spark-3.0.0-bin-without-hadoop.tgz           hadoop-3.1.3.tar.gz         apache-hive-3.1.2-bin.tar.gz         spark-hive_2.12-3.2.2.jar         openjdk 1.8.0_292         mysql-connector-java-5.1.40.tar.gz         

    2024年02月01日
    浏览(39)
  • 大数据平台安装实验: ZooKeeper、Kafka、Hadoop、Hbase、Hive、Scala、Spark、Storm

    ​ 在大数据时代,存在很多开源的分布式数据采集、计算、存储技术,本实验将在熟练掌握几种常见Linux命令的基础上搭建几种常用的大数据采集、处理分析技术环境。 相关安装包下载: 链接:https://pan.baidu.com/s/1Wa2U3qstc54IAUCypcApSQ 提取码:lcd8 Hadoop大数据平台所需工具、软件

    2023年04月09日
    浏览(88)
  • 实现一个MYSQL工具类,包含判断创建数据表是否存在,创建数据表

    可以使用Python的MySQLdb模块来实现一个MYSQL工具类。下面是一个简单的实现示例: 使用示例: 在上面的示例中,我们首先创建了一个MySQLTool类,并在初始化方法中传入了数据库的连接信息。然后使用connect方法连接到数据库。 table_exists方法用于判断给定的数据表是否存在,它执

    2024年01月15日
    浏览(61)
  • 【MySQL】MySQL 数据类型,数值、日期和时间、字符串类型,创建数据表,删除数据表

    作者简介: 辭七七,目前大一,正在学习C/C++,Java,Python等 作者主页: 七七的个人主页 文章收录专栏: 七七的闲谈 欢迎大家点赞 👍 收藏 ⭐ 加关注哦!💖💖 MySQL 中定义数据字段的类型对你数据库的优化是非常重要的。 MySQL 支持多种类型,大致可以分为三类:数值、日

    2024年02月15日
    浏览(74)
  • excel vba 将多张数据表的内容合并到一张数据表

    功能描述:  一个Excel文件有很多个 样式相同 的数据表, 需要将多张数据表的内容合并到一张数据表里。 vba实现代码如下:  文件链接:数据表合并.bas 下载后直接在excel 查看代码处导入文件即可。

    2024年02月11日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包