Spark读取JDBC调优

这篇具有很好参考价值的文章主要介绍了Spark读取JDBC调优。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


实际问题:工作中需要读取一个存放了三四年历史数据的pg数仓表(缺少主键id),需要将数据同步到阿里云 MC中,Spark在使用JDBC读取关系型数据库时,默认只开启一个task去执行,性能低下,因此需要通过设置一些参数来提高并发度。一定要充分理解参数的含义,否则可能会因为配置不当导致数据倾斜!

翻看了网络上好多相关介绍,都沾边。下边总结一下!

您是菜鸟就好好学习,您是大佬欢迎提出修改意见!

一、场景构建

以100行数据为例(实际307983条):

  • 创建表
CREATE TABLE IF NOT EXISTS test(
	good_id STRING ,
	title STRING ,
	sellcount BIGINT,
	salesamount Double
)COMMENT '测试表'
PARTITIONED BY (
	dt	STRING	COMMENT '分区字段'
);
  • 插入数据
insert into test partition (dt = '202001') 
values ('1001','卫衣',1,100.1),('1002','卫裤',2,101.2),('1003','拖鞋',3,10.3)...,('1100','帽子',100,19.23)

二、参数设置

配置文件示例:

jdbc: &jdbc
  options.url: "jdbc:postgresql://xxx.xxx.xxx.xxx:8000/postgres"
  options.user: "xxxxxx"
  options.password: "xxxxxx"
  options.driver: "org.postgresql.Driver"

input:
 - moduleClass: "JDBC"
    <<: *jdbc
    options.dbtable: "SELECT *,cast(good_id as bigint)*1%6 mo FROM test.test where dt = '202001'"
    options.fetchsize: "100"
    options.partitionColumn: "mo" # 分区列,一般为自增id,下边解释下为啥用mo
    options.numPartitions: "6" #分区数
    options.lowerBound: "0"
    options.mytime: "${yyyy}-${MM}-${dd}"
    options.upperBound: "6" # 该值设置为和分区列最大值差不多的值
    resultDF: "df"

提交spark配置

  spark-submit \
    --class xx.xxx.xxx.xxx \
    --master local[*] \
    --num-executors 6 \
    --executor-cores 1 \
    --executor-memory 2G \
    --driver-memory 4G \
    /root/test/xxx.jar \
    -p xxx/xxx.yaml -cyctime $cyctime
  • options.fetchsize:一次性读取的数据条数,按集群规模(例:64核128G)一次1000条;阿里云Spark集群链接不了华为云pg数仓,我开了一台独立机器(8核16G)一次100条

  • options.partitionColumn:分区列,必须是bigint类型;

  • options.numPartitions:设置分区数,最好和spark提交的executors数一致;上文中spark任务数为6,分区数也为6

  • options.lowerBound:分区开始值

  • options.upperBound:分区结束值;numPartitions、lowerBound、upperBound这三个必须同时设置,每个分区的数据量计算公式为:upperBound / numPartitions - lowerBound / numPartitions,任务运行时间看的是最长的那个任务,所以要尽可能保证每一个分区的数据量差不多

官方配置文档:
Spark读取JDBC调优

1.灵活运用分区列

有的小伙伴就该思考为啥不用自增id做分区列呢?

因为实际生产环境中,一是不需要,二是创建表忽略了自增id等等。

为啥要新做一列mo,而不直接将商品id转bigint用呢?

算是一个补救措施,新做一个数据列,在读取过程用mo做shuffle,mo是商品id强转为bigint后对6取膜,结果为0-5共6种可能,提高了shuffle的效率,计算分区的数据量:6 / 6 - 0 / 6 = 1;也就是说分区值为0,1,2,3,4,(大于5),对应6个任务,6个核心。

下面是运行shuffle结束后的截图,可以看到每一个task获取的数据量都比较均匀

Spark读取JDBC调优
下面来看一个错误的案例:
Spark读取JDBC调优
上图配置就会导致数据倾斜
numPartitions=10,
lowerBound=0,
upperBound=100,
表的数据量是1000。
根据计算公式每个分区的数据量是100/10-0/10=10,分10个区,那么前9个分区数据量都是10,但最后一个分区数据量却达到了910,即数据倾斜了,所以upperBound-lowerBound要和表的分区字段最大值差不多

有啥需要优化的欢迎评论纠正文章来源地址https://www.toymoban.com/news/detail-407093.html

到了这里,关于Spark读取JDBC调优的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • spark读取hive表字段,区分大小写问题

    背景 spark任务读取hive表,查询字段为小写,但Hive表字段为大写,无法读取数据 问题错误: 如何解决呢? In version 2.3 and earlier, when reading from a Parquet data source table, Spark always returns null for any column whose column names in Hive metastore schema and Parquet schema are in different letter cases, no matter wh

    2024年01月23日
    浏览(52)
  • 在工作中使用ChatGPT需要担心泄密问题吗?

      ​OpenAI的ChatGPT可以通过自动简化繁琐的任务,针对挑战性问题的提供创造性的解决方案来提高员工的生产力。但随着这项技术被整合到人力资源平台和其他工作场所中,它给各个企业带来了巨大的挑战。苹果、Spotify、Verizon和三星等大公司已禁止或限制员工在工作中使用这

    2024年02月14日
    浏览(37)
  • 【Elasticsearch】 实际生产中的监控及调优

       目录 监控 API 调优 1、CPU使用率 ES中导致CPU 变高的因素 ES导致CPU 变高的解决方案          2、内存使用率 ES内存使用率 过高的可能因素 ES内存使用率 过高的处理方案 3、ES磁盘使用率 ES磁盘使用率过高的可能因素 4、ES 中GC频次 ES 中GC频次增加的可能因素 ES 中GC频次降低

    2024年02月15日
    浏览(40)
  • Spark九:Spark调优之Shuffle调优

    map端和reduce端缓存大小设置,reduce端重试次数和等待时间间隔,以及bypass设置 学习资料:https://mp.weixin.qq.com/s/caCk3mM5iXy0FaXCLkDwYQ 在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中

    2024年01月20日
    浏览(40)
  • Spark(30):Spark性能调优之常规性能调优

    目录 0. 相关文章链接 1. 最优资源配置 2. RDD优化 2.1. RDD复用 2.2. RDD持久化 2.3. RDD尽可能早的 filter 操作 3. 并行度调节 4. 广播大变量 5. Kryo序列化 6. 调节本地化等待时长  Spark文章汇总          Spark 性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增

    2024年02月16日
    浏览(60)
  • Spark调优解析-GC调优3(七)

    Spark立足内存计算,常常需要在内存中存放大量数据,因此也更依赖JVM的垃圾回收机制。与此同时,它也兼容批处理和流式处理,对于程序吞吐量和延迟都有较高要求,因此GC参数的调优在Spark应用实践中显得尤为重要。 按照经验来说,当我们配置垃圾收集器时,主要有两种策

    2024年02月19日
    浏览(38)
  • Spark on Yarn 最佳运行参数调优-计算方式_spark on yarn 调优 nodemanager

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新软件测试全套学习资料》

    2024年04月26日
    浏览(45)
  • Spark 参数调优

    目录 Spark 调优 一、代码规范 1.1 避免创建重复 RDD 1.2 尽量复用同一个 RDD 1.3 多次使用的 RDD 要持久化 1.4 使用高性能算子 1.5 好习惯 二、参数调优 资源参数 1.1 --num-executors 100 1.2 --executor-memory 5g 1.3 --executor-cores 4 1.4 --driver-memory 内存参数 spark.storage.memoryFraction、spark.shuffle.memor

    2024年04月11日
    浏览(31)
  • Spark:性能调优实战

    链接: 文字文档 极客链接 一、资源申请并行度 一个Executor中同时可以执行的task数目(在Executor内存不变的情况下,executor-cores数越大,平均下来一个task可以使用的内存就越少) Executor Java进程的堆内存大小,即Executor Java进程的Xmx值 Executor Java进程的off-heap内存,包括JVM over

    2024年04月16日
    浏览(45)
  • Spark SQL调优实战

    1、 新添参数说明 // D river 和Executor内存和CPU资源相关配置 -- 是否开启 executor 动态分配 , 开启时 spark.executor.instances 不生效 spark.dynamicAllocation.enabled= false --配置Driver内存 spark.dirver.memory=5g --driver最大结果大小,设置为0代表不限制,driver在拉取结果时,如果结果超过阈值会报异

    2024年02月21日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包