Python小案例(九)PySpark读写数据

这篇具有很好参考价值的文章主要介绍了Python小案例(九)PySpark读写数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Python小案例(九)PySpark读写数据

有些业务场景需要Python直接读写Hive集群,也需要Python对MySQL进行操作。pyspark就是为了方便python读取Hive集群数据,当然环境搭建也免不了数仓的帮忙,常见的如开发企业内部的Jupyter Lab

⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接公司hive集群的

利用PySpark读写Hive数据

# 设置PySpark参数
from pyspark.sql import *
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.instances", "20") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .enableHiveSupport() \
    .getOrCreate()

# 导入其他相关库
import pandas as pd
from datetime import datetime
import pymysql  # mysql连接库

创建hive表

sql_hive_create = '''
CREATE TABLE IF NOT EXISTS temp.hive_mysql
    (
        id int comment "id"
        ,dtype string comment "类型"
        ,cnt int comment "数量"
    )
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''

spark.sql(sql_hive_create)
DataFrame[]

写入hive表

sql_hive_insert = '''
insert overwrite table temp.hive_mysql


select 1 as id, 'A' as dtype, 10 as cnt

union all

select 2 as id, 'B' as dtype, 23 as cnt
'''

spark.sql(sql_hive_insert)
DataFrame[]

读取hive表

sql_hive_query = '''
select 
    id
    ,dtype
    ,cnt
from
    temp.hive_mysql
'''

df = spark.sql(sql_hive_query).toPandas()
df.head()
id dtype cnt
0 1 A 10
1 2 B 23

利用Python读写MySQL数据

连接mysql

# 数据库信息
config = {'host': '***',  # 默认127.0.0.1
          'user': '*',  # 用户名
          'password': '*',  # 密码
          'port': 3306  # 端口,默认为3306
          'database': 'dbname'  # 数据库名称
          }
# 校验关联是否成功
con = pymysql.connect(**config)  # 建立mysql连接
cursor = con.cursor()  # 获得游标
cursor.execute("show tables")  # 查询表
1335

创建mysql表

sql_mysql_create = '''
CREATE TABLE IF NOT EXISTS `hive_mysql`
    (
        `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键'
        ,`hmid` int(30) NOT NULL DEFAULT '0' COMMENT 'hmid'
        ,`dtype` varchar(30) NOT NULL DEFAULT 'total_count' COMMENT '类型'
        ,`cnt` int(30) NOT NULL DEFAULT '0' COMMENT '数量'

        ,`dbctime` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间'
        ,`dbutime` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间'

        ,PRIMARY KEY (`id`)
        ,UNIQUE KEY `u_key` (`dtype`)
    ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT '题目数量'
'''

# cursor.execute(sql_mysql_create) # 无建表权限,可申请权限或者内部管理工具手动建表

写入mysql表

insert_mysql_sql = '''
insert into hive_mysql (hmid, dtype, cnt) values (%s, %s, %s)
'''
try:
    con = pymysql.connect(**config)  # 建立mysql连接
    cursor = con.cursor()  # 获得游标
    
    # 清空数据
    cursor.execute('truncate table hive_mysql')
    
    for i in range(df.__len__()):
        # 插入的数据类型需要与数据库中字段类型保持一致
        cursor.execute(insert_mysql_sql, (int(df.iloc[i, 0]), df.iloc[i, 1], int(df.iloc[i, 2])))

    # 提交所有执行命令
    con.commit()
    print('数据写入成功!')
    cursor.close()  # 关闭游标
except Exception as e:
    raise e
finally:
    con.close()  # 关闭连接
数据写入成功!

读取mysql表

sql_mysql_query = '''
select 
    hmid
    ,dtype
    ,cnt
from
    hive_mysql
'''
try:
    con = pymysql.connect(**config)  # 建立mysql连接
    cursor = con.cursor()  # 获得游标
    
    cursor.execute(sql_mysql_query)  # 执行sql语句
    df_mysql = pd.DataFrame(cursor.fetchall())  # 获取结果转为dataframe

    # 提交所有执行命令
    con.commit()
    
    cursor.close()  # 关闭游标
except Exception as e:
    raise e
finally:
    con.close()  # 关闭连接
df_mysql.head()
0 1 2
0 1 A 10
1 2 B 23

利用PySpark写入MySQL数据

日常最常见的是利用PySpark将数据批量写入MySQL,减少删表建表的操作。但由于笔者当前公司线上环境没有配置mysql的驱动,下述方法没法使用。

MySQL的安全性要求很高,正常情况下,分析师关于MySQL的权限是比较低的。所以很多关于MySQL的操作方法也是无奈之举~

# ## 线上环境需配置mysql的驱动
# sp = spark.sql(sql_hive_query)
# sp.write.jdbc(url="jdbc:mysql://***:3306/dbname",   # dbname为库名,必须已存在(该语句不会创建库)
#               mode="overwrite",     # 模式分为overwrite 重写表    append表内内容追加
#               table="hive_mysql",    # 表名,表不需要去创建,可以自己生成
#               properties={'driver':'com.mysql.jdbc.Driver', 'user':'*', 'password':'*'})

总结

Python读取Hive数据,以及利用Python关联Hive和MySQL是后续自动化操作的基础,因此简单的理解PySpark如何进行Hive操作即可。

共勉~文章来源地址https://www.toymoban.com/news/detail-524770.html

到了这里,关于Python小案例(九)PySpark读写数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 《PySpark大数据分析实战》-26.数据可视化图表Seaborn介绍

    📋 博主简介 💖 作者简介:大家好,我是wux_labs。😜 热衷于各种主流技术,热爱数据科学、机器学习、云计算、人工智能。 通过了TiDB数据库专员(PCTA)、TiDB数据库专家(PCTP)、TiDB数据库认证SQL开发专家(PCSD)认证。 通过了微软Azure开发人员、Azure数据工程师、Azure解决

    2024年01月21日
    浏览(45)
  • PySpark实战指南:大数据处理与分析的终极指南【上进小菜猪大数据】

    上进小菜猪,沈工大软件工程专业,爱好敲代码,持续输出干货。 大数据处理与分析是当今信息时代的核心任务之一。本文将介绍如何使用PySpark(Python的Spark API)进行大数据处理和分析的实战技术。我们将探讨PySpark的基本概念、数据准备、数据处理和分析的关键步骤,并提

    2024年02月06日
    浏览(47)
  • 《PySpark大数据分析实战》-12.Spark on YARN配置Spark运行在YARN上

    📋 博主简介 💖 作者简介:大家好,我是wux_labs。😜 热衷于各种主流技术,热爱数据科学、机器学习、云计算、人工智能。 通过了TiDB数据库专员(PCTA)、TiDB数据库专家(PCTP)、TiDB数据库认证SQL开发专家(PCSD)认证。 通过了微软Azure开发人员、Azure数据工程师、Azure解决

    2024年02月03日
    浏览(56)
  • Python大数据之PySpark(七)SparkCore案例

    PySpark实现SouGou统计分析 jieba分词: pip install jieba 从哪里下载pypi 三种分词模式 精确模式,试图将句子最精确地切开,适合文本分析;默认的方式 全模式,把句子中所有的可以成词的词语都扫描出来, 速度非常快,但是不能解决歧义; 搜索引擎模式,在精确模式的基础上,对

    2024年02月08日
    浏览(36)
  • PySpark 读写Hive数据源

    一、环境配置 本文在Windows下配置Spark访问Hive。如需在Linux上配置,请对应Linux上同样的目录即可。 检查PySpark环境正常运行;检查Hive环境正常运行;启动Hive元数据服务 hive –service metastore 先将%HIVE_HOME%confhive-site.xml拷贝到%SPARK_HOME%conf。此步骤是为了Spark能读取Hive相应的配置

    2024年04月16日
    浏览(44)
  • Python学习之PySpark案例实战

    Spark是什么 Apache Spark是用于 大规模数据(large-scala data)处理的统一 (unified) 分析引擎 。 简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。 Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发 而Python语

    2024年02月05日
    浏览(42)
  • Python学习路线 - Python高阶技巧 - PySpark案例实战

    Spark是什么 定义:Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。 简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃致EB级别的海量数据 Python On Spark Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开

    2024年02月21日
    浏览(49)
  • 【Python】PySpark 数据处理 ② ( 安装 PySpark | PySpark 数据处理步骤 | 构建 PySpark 执行环境入口对象 )

    执行 Windows + R , 运行 cmd 命令行提示符 , 在命令行提示符终端中 , 执行 命令 , 安装 PySpark , 安装过程中 , 需要下载 310 M 的安装包 , 耐心等待 ; 安装完毕 : 命令行输出 : 如果使用 官方的源 下载安装 PySpark 的速度太慢 , 可以使用 国内的 镜像网站 https://pypi.tuna.tsinghua.edu.cn/simple

    2024年02月06日
    浏览(43)
  • Python大数据之PySpark(二)PySpark安装

    1-明确PyPi库,Python Package Index 所有的Python包都从这里下载,包括pyspark 2-为什么PySpark逐渐成为主流? http://spark.apache.org/releases/spark-release-3-0-0.html Python is now the most widely used language on Spark. PySpark has more than 5 million monthly downloads on PyPI, the Python Package Index. 记住如果安装特定的版本

    2024年02月04日
    浏览(42)
  • Python大数据之PySpark

    Apache Spark是一种用于大规模数据处理的多语言分布式引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习 Spark官网:https://spark.apache.org/ 按照官网描述,Spark关键特征包括: 批/流处理 Spark支持您使用喜欢的语言:Python、SQL、Scala、Java或R,统一批量和实时流处

    2024年02月08日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包