集群上运行pyspark

这篇具有很好参考价值的文章主要介绍了集群上运行pyspark。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、PySpark应用背景
大规模结构化数据处理要求;
scala编写函数对开发人员接受度低,python的高度简化代码完美契合数据处理过程;
和scala处理无任何性能上的差异;
二、PySpark原理
Spark是什么:
分布式(集群部署),高性能(基于内存可缓存磁盘),高可用的基于RDD(分区的不可变的弹性分布数据集)有向无环数据处理引擎。
Spark现在常用的应用:Spark Sql
Spark sql是什么:
为结构化的数据处理提供sql支持,spark通过catalyst优化引擎对sql进行分解,逻辑及硬件优化解析,最后将sql通过code generation转化为可执行代码。总的来说spark sql引用了sql来处理HDFS上面结构化数据,隔离了代码,开发成本极低,学习成本=mysql学习成本。
为什么要有PySpark:
PySpark(python+spark)Spark支持开发语言有python,为什么不用呢?scala上手过于麻烦,且scala是基于jvm的类java语言,外行很难上手。
PySpark中的应用实例:
Pyspark Sql+python代码,亦可以结合pandas各种交叉开发。
三、Pyspark实战
无需多言直接贴代码
1、spark实例创建,可作为工具类直接引入实现,以下为spark应用实例初始化。(这一步骤在spark集群内部手动封装了,无需频繁创建,详情见:关于Spark应用公共配置封装到PySpark)
from pyspark.sql import SparkSession

def create_spark_session(app_name):
# 生产用
spark = (SparkSession.builder
.appName(app_name) #应用名称:可写可不写,自己理解
.enableHiveSupport() #hive支持
.config(“spark.sql.parquet.compression.codec”, “snappy”) #config为参数优化
.config(“hive.exec.dynamic.partition”, “true”)
.config(“hive.exec.dynamic.partition.mode”, “nonstrict”)
.config(“parquet.compression”, “SNAPPY”)
.config(“hive.exec.max.dynamic.partitions”, “3000”)
.config(“parquet.enable.dictionary”, “false”)
.config(“hive.support.concurrency”, “true”)
.config(“spark.sql.hive.convertMetastoreParquet”, “false”)
.config(“spark.sql.parquet.writeLegacyFormat”, “true”)
.config(“spark.sql.shuffle.partitions”, 500)
.config(“spark.default.parallelism”, 100)
.config(“spark.storage.memoryFraction”, 0.5)
.config(“spark.reducer.maxSizeInFlight”, “128”)
.config(“spark.shuffle.memoryFraction”, 0.3)
.config(“spark.shuffle.file.buffer”, “32m”)
.config(“spark.shuffle.sort.bypassMergeThreshold”, “300”)
.config(“spark.shuffle.io.maxRetries”, “60”)
.config(“spark.shuffle.io.retryWait”, “60s”)
.config(“spark.shuffle.consolidateFiles”, “true”)
.getOrCreate()) #通过Sparksession创建实例
# 本地用
# spark = (SparkSession.builder
# .enableHiveSupport()
# .appName(“MyApp”)
# .config(“spark.driver.memory”, “8g”) #本地执行,资源分配
# .config(“spark.executor.memory”, “8g”)
# .config(“spark.executor.cores”, “10”)
# .getOrCreate())
return spark

2、引用实例,操作写sql
import create_spark_session #引入上一步实例
spark = create_spark_session(“pj_js”) #创建实例,pj_js为应用名,自定义。
#无需多言,写sql,spark中写sql需要用spark.sql(“sql内容”).来包围
#.show展示数据,show()中可以带参数100,100为展示100行
spark.sql(“select * from **”).show(100)

#创建临时表为:
spark.sql(“select * from **”).createOrReplaceTempView(“jg_table”)

#临时表可查询
spark.sql(“select * from jg_table”).show(100)

#临时表可生成DataFrame
val jgDF = spark.sql(“select * from jg_table”).ToDF(“nsrsbh”,“nsrmc”,“djrq”)

#DataFrame创建临时表
jgDF.createOrReplaceTempView(“ls_table”)

#DataFrame选取一列作为参数传入python自定义函数如:
result = findCombinations(values_list, jgDF.select(“hkje”).collect()[0][0])

#DataFrame引入pandas进行操作

未完待续。。。
3、如何运行?
通过spark submit提交到大数据集群(因为有spark集群),集群通过yarn管理和调度资源,因为是用了spark,资源会就近原则,就近计算节点分区数据,高性能!。
提交脚本参考:
sudo -u hive /opt/spark/bin/spark-submit --master spark://centos2:7077 --driver-memory=4G --executor-memory 4g --conf spark.cores.max=28 /home/app/your_python_file.py

四、PySpark如何各种工具交叉操作数据

未完待续。。。

五、注意

1、实际生产加上这个,因为每台服务器上面的集群一开始没配置完善,导致部分集群spark中python环境不一致
import os
os.environ[“PYSPARK_PYTHON”]=“/var/lib/hive/anaconda3/bin/python3”
os.environ[“PYSPARK_DRIVER_PYTHON”]=“/var/lib/hive/anaconda3/bin/python3”

2、PySpark日志管理:INFO,DEBUG,ERROR

spark内置日志管理:

      spark.sparkContext.setLogLevel("INFO")

或者python日志管理下:
import logging
logging.basicConfig(level=logging.INFO, format=‘%(asctime)s - %(name)s - %(levelname)s - %(message)s’)
log.info(‘这是一条日志提醒你代码运行成功,数据库已初始化’)

3、由于我对pyspark实战第一步创建实例函数进行了封装,所以代码中无需重复创建create_spark_session函数。实际生产代码为:

coding:utf-8

from pyspark.create_session import create_spark_session

spark = create_spark_session(“**”)

spark.sql(“select * from **”).show(10)

4、

六、总结
简洁明了的Python语言 + 简单的Sql + 高性能的spark计算引擎 + 任务提交的简单可靠(不用打成jar包,代码vi就能修改操作)
处理点数据还是没多大问题
spark中的spark streaming没有实战过,但是也是支持实时流的,区别flink一个是触发型一个是非触发型数据流处理。

!!!待完善中…文章来源地址https://www.toymoban.com/news/detail-846241.html

到了这里,关于集群上运行pyspark的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】

    视频地址: 尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程、案例实操)】 尚硅谷大数据技术Spark教程-笔记03【SparkSQL(概述、核心编程、

    2023年04月21日
    浏览(48)
  • 大数据学习06-Spark分布式集群部署

    配置好IP vim /etc/sysconfig/network-scripts/ifcfg-ens33 修改主机名 vi /etc/hostname 做好IP映射 vim /etc/hosts 关闭防火墙 systemctl status firewalld systemctl stop firewalld systemctl disable firewalld 配置SSH免密登录 ssh-keygen -t rsa 下载Scala安装包 配置环境变量 添加如下配置 使环境生效 验证 Spark官网 解压 上

    2024年02月10日
    浏览(70)
  • Spark大数据处理讲课笔记4.1 Spark SQL概述、数据帧与数据集

      目录 零、本讲学习目标 一、Spark SQL (一)Spark SQL概述 (二)Spark SQL功能 (三)Spark SQL结构 1、Spark SQL架构图 2、Spark SQL三大过程 3、Spark SQL内部五大组件 (四)Spark SQL工作流程 (五)Spark SQL主要特点 1、将SQL查询与Spark应用程序无缝组合 2、Spark SQL以相同方式连接多种数据

    2024年02月09日
    浏览(64)
  • 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境)】

    视频地址: 尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程、案例实操)】 尚硅谷大数据技术Spark教程-笔记03【SparkSQL(概述、核心编程、

    2023年04月15日
    浏览(55)
  • Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

      目录 零、本讲学习目标 一、基本操作 二、默认数据源 (一)默认数据源Parquet (二)案例演示读取Parquet文件 1、在Spark Shell中演示 2、通过Scala程序演示 三、手动指定数据源 (一)format()与option()方法概述 (二)案例演示读取不同数据源 1、读取房源csv文件 2、读取json,保

    2024年02月09日
    浏览(44)
  • Spark SQL数据源:JDBC

    Spark SQL还可以使用JDBC API从其他关系型数据库读取数据,返回的结果仍然是一个DataFrame,可以很容易地在Spark SQL中处理,或者与其他数据源进行连接查询。 在使用JDBC连接数据库时可以指定相应的连接属性 属性 介绍 url 连接的JDBC URL driver JDBC驱动的类名 user 数据库用户名 pass

    2024年02月09日
    浏览(41)
  • 【Spark大数据习题】习题_Spark SQL&&&Kafka&& HBase&&Hive

    PDF资源路径-Spark1 PDF资源路径-Spark2 一、填空题 1、Scala语言的特性包含面向对象编程、函数式编程的、静态类型的、可扩展的、可以交互操作的。 2、在Scala数据类型层级结构的底部有两个数据类型,分别是 Nothing和Null。 3、在Scala中,声明变量的有var声明变量和val声明常

    2024年02月06日
    浏览(45)
  • Spark SQL数据源:Hive表

    Spark SQL还支持读取和写入存储在Apache Hive中的数据。然而,由于Hive有大量依赖项,这些依赖项不包括在默认的Spark发行版中,如果在classpath上配置了这些Hive依赖项,Spark就会自动加载它们。需要注意的是,这些Hive依赖项必须出现在所有Worker节点上,因为它们需要访问Hive序列化

    2024年02月11日
    浏览(38)
  • Spark SQL数据源 - 基本操作

    一、案例演示读取Parquet文件 执行命令: cd $SPARK_HOME/examples/src/main/resources ,查看Spark的样例数据文件users.parquet 将数据文件users.parquet上传到HDFS的/datasource/input目录 二、在Spark Shell中演示 启动Spark Shell,执行命令: spark-shell --master spark://master:7077 执行命令: val userdf = spark.read

    2024年02月09日
    浏览(42)
  • 电影评分数据分析案例-Spark SQL

    1. 2. 3. 4. 5. 6.

    2024年02月08日
    浏览(74)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包