CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总

这篇具有很好参考价值的文章主要介绍了CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

需求:内网通过Excel文件将数据同步到外网的CDH服务器中,将CDH中的文件数据写入hive中。

CDH版本为:6.3.2
spark版本为:2.4
python版本:2.7.5
操作系统:CentOS Linux 7
集群方式:yarn-cluster

一、在linux中将excel文件转换成CSV文件,然后上传到hdfs中。
为何要先转csv呢?主要原因是pyspark直接读取excel的话,涉及到版本的冲突问题。commons-collections-3.2.2.jar 在CDH6.3.2中的版本是3.2.2.但是pyspark直接读取excel要求collections4以上的版本,虽然也尝试将4以上的版本下载放进去,但是也没效果,因为时间成本的问题,所以没有做过多的尝试了,直接转为csv后再读吧。
spark引用第三方包

1.1 转csv的python代码(python脚本)

#-*- coding:utf-8 -*-
import pandas as pd
import os, xlrd ,sys

def xlsx_to_csv_pd(fn):
    path1="/home/lzl/datax/"+fn+".xlsx"
    path2="/home/lzl/datax/"+fn+".csv"
    data_xls = pd.read_excel(path1, index_col=0)
    data_xls.to_csv(path2, encoding='utf-8')

if __name__ == '__main__':
    fn=sys.argv[1]
    print(fn)
    try:
		xlsx_to_csv_pd(fn)
		print("转成成功!")
    except Exception as e:
		print("转成失败!")

1.2 数据中台上的代码(shell脚本):

#!/bin/bash
#@description:这是一句描述
#@author: admin(admin)
#@email: 
#@date: 2023-09-26 14:44:3

# 文件名称
fn="项目投运计划"

# xlsx转换成csv格式
ssh root@cdh02 " cd /home/lzl/shell; python xlsx2csv.py $fn" 

# 将文件上传到hfds上
ssh root@cdh02 "cd /home/lzl/datax; hdfs dfs -put $fn.csv /origin_data/sgd/excel/"
echo "上传成功~!"

# 删除csv文件
ssh root@cdh02 "cd /home/lzl/datax; rm -rf $fn.csv"
echo "删除成功~!"

二、pyspark写入hive中
2.1 写入过程中遇到的问题点
2.1.1 每列的前后空格、以及存在换行符等问题。采取的措施是:循环列,采用trim函数、regexp_replace函数处理。

# 循环对每列去掉前后空格,以及删除换行符
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replace

for name in df.columns:
    df = df.withColumn(name, F.trim(df[name]))
    df = df.withColumn(name, regexp_replace(col(name), "\n", ""))

2.1.2 个别字段存在科学计数法,需要用cast转换

from pyspark.sql.types import *

# 取消销售订单号的科学记数法
col="销售订单号"
df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))

去掉换行符另一种方法:换行符问题也可以参照这个

2.2 数据中台代码(pyspark)

# -*- coding:utf-8
# coding=UTF-8

# 引入sys,方便输出到控制台时不是乱码
import  sys   
reload(sys)
sys.setdefaultencoding( "utf-8" )

# 引入模块
from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext 
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import *

# 设定资源大小
conf=SparkConf()\
    .set("spark.jars.packages","com.crealytics:spark-excel_2.11:0.11.1")\
    .set("spark.sql.shuffle.partitions", "4")\
    .set("spark.sql.execution.arrow.enabled", "true")\
    .set("spark.driver.maxResultSize","6G")\
    .set('spark.driver.memory','6G')\
    .set('spark.executor.memory','6G')

# 建立SparkSession
spark = SparkSession \
    .builder\
    .config(conf=conf)\
    .master("local[*]")\
    .appName("dataFrameApply") \
    .enableHiveSupport() \
    .getOrCreate()

# 读取cvs文件
# 文件名称和文件位置
fp= r"/origin_data/sgd/excel/项目投运计划.csv"
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiLine", "true") \
    .option("delimiter", ",") \
    .format("csv") \
    .load(fp)

# 查看数据类型
# df.printSchema()

# 循环对每列去掉前后空格,以及删除换行符
for name in df.columns:
    df = df.withColumn(name, F.trim(df[name]))
    df = df.withColumn(name, regexp_replace(col(name), "\n", ""))

# 取消销售订单号的科学记数法
col="销售订单号"
df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))

df.show(25,truncate = False) # 查看数据,允许输出25行

# 设置日志级别 (这两个没用)
sc = spark.sparkContext
sc.setLogLevel("ERROR")

# 写入hive中
spark.sql("use sgd_dev")  # 指定数据库

# 创建临时表格 ,注意建表时不能用'/'和''空格分隔,否则会影响2023/9/4和2023-07-31 00:00:00这样的数据
spark.sql("""
CREATE TABLE IF NOT EXISTS ods_sgd_project_operating_plan_info_tmp (
    project_no                string         ,
    sale_order_no             string         ,
    customer_name             string         ,
    unoperating_amt           decimal(19,2)  , 
    expected_operating_time   string         ,
    operating_amt             decimal(19,2)  ,  
    operating_progress_track  string         ,
    is_Supplied               string         ,
    operating_submit_time     string         ,
    Signing_contract_time     string         ,
    remake                    string  
    )
 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'    
""")

# 注册临时表
df.createOrReplaceTempView("hdfs_df")
# spark.sql("select * from hdfs_df limit 5").show() #查看前5行数据

# 将数据插入hive临时表中
spark.sql("""
    insert overwrite table ods_sgd_project_operating_plan_info_tmp select * from hdfs_df
""")

# 将数据导入正式环境的hive中
spark.sql("""
    insert overwrite table ods_sgd_project_operating_plan_info select * from ods_sgd_project_operating_plan_info_tmp
""")

# 查看导入后的数据
spark.sql("select * from ods_sgd_project_operating_plan_info limit 20").show(20,truncate = False)

# 删除注册的临时表
spark.sql("""
    drop table hdfs_df
""")

# 删除临时表
spark.sql("""
    drop table ods_sgd_project_operating_plan_info_tmp
""")

关于spark的更多知识,可以参看Spark SQL总结文章来源地址https://www.toymoban.com/news/detail-729953.html

到了这里,关于CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • CDH6.3.2企业级安装实战

    1、环境介绍 IP 操作系统 联网 10.191.15.15 Centos 7.4 离网 10.191.15.16 Centos 7.4 离网 10.191.15.17 Centos 7.4 离网 10.191.15.18 Centos 7.4 离网 2、搭建本地Yum源 2.1 配置本地基础Yum源 1、上传镜像到服务器 下载的Centos镜像为 CentOS-7-x86_64-Everything-1708.iso , 放置目录为: /root/download

    2024年01月18日
    浏览(40)
  • CDH6.3.2-组件安装&安全认证

    1.选择自定义。 2.选择HDFS ZK YARN然后点继续。    3.选择安装的主机。 4.审核更改默认就行,点继续。  5.配置HDFS的HA。    安装好以后点击hdfs进入实例就能够看到启动了高可用。 6.启动YARN的高可用。         更具需求修改资源    一直点继续就行了                 在/

    2024年02月16日
    浏览(41)
  • CDH6.3.2 集成 Flink 1.17.0 失败过程

    目录 一:下载Flink,并制作parcel包 1.相关资源下载 2. 修改配置 准备工作一: 准备工作二: 3. 开始build 二:开始在CDH页面分发激活  三:CDH添加Flink-yarn 服务  四:启动不起来的问题解决 五:CDH6.3.2集群集成zookeeper3.6.3 六:重新适配Flink服务 环境说明: cdh版本:cdh6.3.2 组件版本信

    2024年01月17日
    浏览(28)
  • 服务器编译spark3.3.1源码支持CDH6.3.2

    1、一定要注意编译环境的配置 2、下载连接 3、安装直接解压,到/opt/softwear/文件夹 4、配置环境变量 5、更改相关配置文件 一定注意下面的修改配置 6、修改mvn地址 6.1、如果编译报错栈已经满了修改如下 7、更改 scala版本 8、执行脚本编译 9、打包完在/opt/softwear/spark-3.3.1 有一

    2023年04月15日
    浏览(42)
  • flink1.14.5使用CDH6.3.2的yarn提交作业

    使用CDH6.3.2安装了hadoop集群,但是CDH不支持flink的安装,网上有CDH集成flink的文章,大都比较麻烦;但其实我们只需要把flink的作业提交到yarn集群即可,接下来以CDH yarn为基础,flink on yarn模式的配置步骤。 一、部署flink 1、下载解压 官方下载地址:Downloads | Apache Flink 注意:CD

    2024年01月16日
    浏览(40)
  • Unrecognized Hadoop major version number: 3.0.0-cdh6.3.2

     一.环境描述 spark提交job到yarn报错,业务代码比较简单,通过接口调用获取数据,将数据通过sparksql将数据写入hive中,尝试各种替换hadoop版本,最后拿下 1.hadoop环境 2.项目 pom.xml 3.项目集群提交报错         at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupRelation(SessionCatalog

    2024年02月12日
    浏览(28)
  • cdh6.3.2 Flink On Yarn taskmanager任务分配倾斜问题的解决办法

    Flink On Yarn任务启动 CDH:6.3.2 Flink:1.13.2 Hadoop:3.0.0 在使用FLink on Yarn调度过程中,发现taskmanager总是分配在集中的几个节点上,集群有11个节点,但每个任务启动,只用到两三个节点,导致这几台服务器负载过高,其他节点又比较空闲。 1、yarn.scheduler.fair.assignmultiple 2、yarn.s

    2024年02月12日
    浏览(32)
  • python自动化办公——定制化读取Excel数据并写入到word表格

    最近到了毕业设计答辩的时候,老师让我帮毕业生写一段毕业设计的功能就是提供一个 学士学位授予申请表 ,根据定制化需求,编写定制化代码。 docx格式的word如下图。 再提供一个Excel表格,要求可以直接读取表格里的对应内容,填入到word表格里的对应位置。表格是我自己

    2024年02月10日
    浏览(46)
  • POI:从Excel文件中读取数据,向Excel文件中写入数据,将Excel表格中的数据插入数据库,将数据库中的数据添加到Excel表

    POI是Apache软件基金会用Java编写的免费开源的跨平台的 Java API,Apache POI提供API给Java程序对Microsoft Office格式档案读和写的功能。POI为“Poor Obfuscation Implementation”的首字母缩写,意为“可怜的模糊实现”。 所以POI的主要功能是可以用Java操作Microsoft Office的相关文件,但是一般我

    2024年02月10日
    浏览(39)
  • pycharm pyspark连接虚拟机的hive表 读取数据

    给个demo示例:

    2024年04月13日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包