PySpark 读写Hive数据源

这篇具有很好参考价值的文章主要介绍了PySpark 读写Hive数据源。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、环境配置

本文在Windows下配置Spark访问Hive。如需在Linux上配置,请对应Linux上同样的目录即可。

  1. 检查PySpark环境正常运行;检查Hive环境正常运行;启动Hive元数据服务

hive –service metastore

  1. 先将%HIVE_HOME%\conf\hive-site.xml拷贝到%SPARK_HOME%\conf。此步骤是为了Spark能读取Hive相应的配置;
  2. 再将%HIVE_HOME%\lib下的MySQL连接驱动的Jar包(mysql-connector-java-5.1.36-bin.jar)拷贝到%SPARK_HOME%\jars目录下。Jar包的版本与MySQL数据库的版本配套。此步骤是为了Spark能够访问Hive的元数据库。

此时,正常启动PySpark交互程序,可在交互模式下正常访问Hive了。进入交互环境,在提示符后直接输入以下代码:

spark.sql(‘show tables’).show()

正常执行后,应该能够看到default库中的表。这时,可以配置Python IDE的开发环境了

在dataphin平台上通过pyspark连接本地的hive服务,hive,hadoop,数据仓库 

  1. 检查pycharm或其他IDE中的PySpark的开发环境正常;
  2. 再将%HIVE_HOME%\lib下的MySQL连接驱动的Jar包(mysql-connector-java-5.1.36-bin.jar)拷贝到%PYTHONDIR%\Lib\site-packages\pyspark\jars目录下。Jar包的版本与MySQL数据库的版本配套。此步骤是为了在IDE环境中能够访问Hive的元数据库;
  3. 添加环境变量SPARK_CONF_DIR,变量值为%SPARK_HOME%\conf。此步骤是为了在IDE中运行Spark程序时,能够读取Spark配置目录下的相应配置信息;

此时,正常启动pycharm。可在IDE环境下正常访问Hive了。在pycharm的工程中新建一个Python文件,输入以下代码:

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('sparkhive').master('local[*]').enableHiveSupport().getOrCreate()

spark.sql('show tables').show()

正常执行后,应该能够看到default库中的表。

在dataphin平台上通过pyspark连接本地的hive服务,hive,hadoop,数据仓库

二、读写Hive数据源

从Spark2.0开始,引入SparkSession 作为 DataSet 和 DataFrame API 的切入点,SparkSession封装了SparkConf、SparkContext 和 SQLContext。为了向后兼容,SQLContext 和 HiveContext也被保存下来。在实际写程序时,只需要定义一个SparkSession对象就可以了。不用使用SQLContext 和 HiveContext。

  1. SQLContext 和 HiveContext方式读写Hive数据

(1)读取数据

from pyspark.sql import HiveContext

from pyspark import SparkConf,SparkContext

conf=SparkConf().setMaster("local").setAppName("sparkhive")

sc=SparkContext(conf=conf)

# 创建HiveContext实例

hive_context = HiveContext(sc)

# 读取default.stocks表

stocks_df = hive_context.sql("SELECT * FROM stocks")

# 显示数据

stocks_df.show(10)

在dataphin平台上通过pyspark连接本地的hive服务,hive,hadoop,数据仓库

(2)写入数据

from pyspark.sql import HiveContext

from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

from pyspark import SparkConf,SparkContext

conf=SparkConf().setMaster("local").setAppName("sparkhive")

sc=SparkContext(conf=conf)

# 定义DataFrame的结构(与stocks表的结构一致)

schema = StructType([

    StructField("exchange_e", StringType(), True),

    StructField("symbol", StringType(), True),

    StructField("ymd", StringType(), True),

    StructField("price_open", FloatType(), True),

    StructField("price_high", FloatType(), True),

    StructField("price_low", FloatType(), True),

    StructField("price_close", FloatType(), True),

    StructField("volume", IntegerType(), True),

    StructField("price_adj_close", FloatType(), True)

])

hive_context = HiveContext(sc)

# 创建DataFrame

new_data = [("BJSSE","AAPL", "2023-07-04", 150.0, 155.0, 148.0, 152.0, 1000000, 151.0),

            ("SSE", "GOOG","2023-07-04", 2600.0, 2650.0, 2590.0, 2630.0, 500000, 2620.0)]

df_to_write = hive_context.createDataFrame(new_data, schema=schema)

# 注册为临时表以便进行后续操作

df_to_write.registerTempTable("temp_stocks")

# 将临时表中的数据插入到stocks表

hive_context.sql('''

    INSERT INTO TABLE stocks

    SELECT * FROM temp_stocks

''')

hive_context.sql("select * from stocks where exchange_e='SSE'").show()

在dataphin平台上通过pyspark连接本地的hive服务,hive,hadoop,数据仓库

此方法的读写操作也可以参看Spark2.1.0入门:连接Hive读写数据(DataFrame)(Python版)_厦大数据库实验室博客 Spark2.1.0入门:连接Hive读写数据(DataFrame)(Python版)_厦大数据库实验室博客

  1. SparkSession方式读取Hive数据

在Spark中,使用SparkSession(从Spark 2.0开始)可以方便地读取和写入Hive表。以下是如何在Python中使用PySpark进行操作的例子:

(1)读取数据

from pyspark.sql import SparkSession

# 初始化SparkSession并启用Hive支持

spark = SparkSession.builder\

    .appName("StocksDataWriteExample")\

    .enableHiveSupport()\

    .getOrCreate()

# 读取并显示stocks表的数据

spark.sql("SELECT * FROM stocks").show(10) 在dataphin平台上通过pyspark连接本地的hive服务,hive,hadoop,数据仓库

(2)写入数据

from pyspark.sql import SparkSession

# 初始化SparkSession并启用Hive支持

spark = SparkSession.builder \

        .appName("StocksDataWriteExample") \

        .enableHiveSupport() \

        .getOrCreate()

# 定义数据和列结构(与stocks表结构一致)

columns = ["exchange_e", "symbol", "ymd", "price_open", "price_high", "price_low", "price_close", "volume", "price_adj_close"]

new_data = [("BJSSE","AAPL", "2023-07-04", 150.0, 155.0, 148.0, 152.0, 1000000, 151.0),

            ("SSE", "GOOG","2023-07-04", 2600.0, 2650.0, 2590.0, 2630.0, 500000, 2620.0)]

# 创建DataFrame

df_to_write = spark.createDataFrame(new_data, schema=columns)

# 写入数据到stocks表,这里假设mode为'append'(追加模式)

df_to_write.write \

.mode('append') \

.format('Hive') \

.saveAsTable('default.stocks')

(3)要注意的问题

Hive 3.0以后,默认建立的表是ORC格式的(不用在hive-site.xml中开启行级事务支持)。即可以支持INSERT,DELETE和UPDATE行级事务操作。但如果是在Hive交互命令行创建的表,在spark程序看来都是HiveFileFormat格式的表。因此,上面的代码中采用.format('Hive')。Spark会匹配相应的schema。要回避这个问题,也可以采用以下代码,即从一个临时表向目标表追加数据的方法。

# 创建一个与stocks表结构相同的临时表

df_to_write.createOrReplaceTempView("temp_stocks")

# 使用Hive SQL语句将临时表数据插入到stocks表

spark.sql("""

    INSERT INTO TABLE default.stocks

    SELECT * FROM temp_stocks

""")

spark.sql('select * from stocks limit 10').show()

在dataphin平台上通过pyspark连接本地的hive服务,hive,hadoop,数据仓库文章来源地址https://www.toymoban.com/news/detail-853730.html

到了这里,关于PySpark 读写Hive数据源的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spring AOP + 自定义注解 + 动态数据源 实现主从库切换&读写分离】—— 案例实战

                                                 💧 S p r i n g A O P + 主从数据源切换 + 读写分离 + 自定义注解案例实战! color{#FF1493}{Spring AOP + 主从数据源切换 + 读写分离 + 自定义注解 案例实战!} Sp r in g A OP + 主从数据源切换 + 读写分离 + 自定义注解案例

    2024年02月15日
    浏览(37)
  • 自助式数据分析平台:JVS智能BI功能介绍(一)数据源

    数据源概述 数据源是JVS-智能BI支持多种数据形态的基础,核心的目标是将不同的数据来源通过统一接入,实现将不同的数据实现统一的数据加工、数据应用。目前JVS-智能BI主要支持3种形态的数据:数据库、API、离线文件。 ​界面介绍 进入数据源界面,左侧展示的系统中已经

    2024年02月10日
    浏览(44)
  • 基于大数据平台(XSailboat)的计算管道实现MySQL数据源的CDC同步--flink CDC

    笔者在先前的一篇文档《数据标签设计 – 大数据平台(XSailboat)的数据标签模块》 提到了关于数据标签的模块,现已实现并应用于项目中。在项目中遇到这样一种情形: 如果打标信息和业务数据是在一个数据库实例中,那么只需要连接两张表进行查询即可。但是数据标签作为

    2024年01月17日
    浏览(61)
  • springboot整合多数据源的配置以及动态切换数据源,注解切换数据源

    在许多应用程序中,可能需要使用多个数据库或数据源来处理不同的业务需求。Spring Boot提供了简便的方式来配置和使用多数据源,使开发人员能够轻松处理多个数据库连接。如果你的项目中可能需要随时切换数据源的话,那我这篇文章可能能帮助到你 ℹ️:这里对于pom文件

    2024年02月10日
    浏览(49)
  • NamedParameterJdbcTemplate多数据源指定数据源

    实战例子记录 pom config NamedParameterJdbcTemplate(动态sql调用)

    2024年02月08日
    浏览(50)
  • 数据源作用以及spring配置数据源

    数据源,简单理解为数据源头,提供了应用程序所需要数据的位置。数据源保证了应用程序与目标数据之间交互的规范和协议,它可以是数据库,文件系统等等。其中数据源定义了位置信息,用户验证信息和交互时所需的一些特性的配置,同时它封装了如何建立与数据源的连

    2024年02月07日
    浏览(53)
  • SpringBoot——动态数据源(多数据源自动切换)

    日常的业务开发项目中只会配置一套数据源,如果需要获取其他系统的数据往往是通过调用接口, 或者是通过第三方工具比如kettle将数据同步到自己的数据库中进行访问。 但是也会有需要在项目中引用多数据源的场景。比如如下场景: 自研数据迁移系统,至少需要新、老两

    2024年02月16日
    浏览(39)
  • SpringBoot从数据库读取数据数据源配置信息,动态切换数据源

            首先准备多个数据库,主库smiling-datasource,其它库test1、test2、test3         接下来,我们在主库smiling-datasource中,创建表databasesource,用于存储多数据源相关信息。表结构设计如下         创建好表之后,向表databasesource中存储test1、test2、test3三个数据库的相关配置

    2024年01月16日
    浏览(63)
  • 【Spring Boot 3】【数据源】自定义多数据源

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年02月01日
    浏览(60)
  • 【Spring Boot 3】【数据源】自定义JPA数据源

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年01月21日
    浏览(69)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包