记一次Flink遇到性能瓶颈

这篇具有很好参考价值的文章主要介绍了记一次Flink遇到性能瓶颈。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

这周的主要时间花在Flink上面,做了一个简单的从文本文件中读取数据,然后存入数据库的例子,能够正常的实现功能,但是遇到个问题,我有四台机器,自己搭建了一个standalone的集群,不论我把并行度设置多少,跑起来的耗时都非常接近,实在是百思不得其解。机器多似乎并不能帮助它。 把过程记录在此,看后面随着学习的深入能不能解答出这个问题。
记一次Flink遇到性能瓶颈

尝试过的修复方法

集群搭建

出现这个问题后,我从集群的角度来进行了些修改,
1,机器是2核的,slots被设置成了6,那我就有点怀疑是这个设置问题,因为其实只有2核,设置的多了,反而存在抢占资源,导致运行达不到效果,改成2后效果一样,没有改进。这个参数在
taskmanager.numberOfTaskSlots: 2
2,调整内存, taskmanager 从2G调整为4G, 效果也没有变化。
taskmanager.memory.process.size: 4000m
这里说下这个内存,我们设置的是总的Memory,也就是这个Total Process Memory。
记一次Flink遇到性能瓶颈
剔除掉些比较固定的Memory,剩下的大头就是这个Task Heap 和 Managed Memory。
所以我们调整大小后,它两个也就相应的增加了。 我查了下这两个,可以理解为堆内存和堆外内存,
一个是存放我们程序的对象,会被垃圾回收器回收;一个是堆外内存,比如RockDB 和 缓存 sort,hash 等的中间结果。

程序方面修改

最开始的时候我把保存数据库操作写在MapFunction里面,后来改到SinkFunction里面。
SinkFunction里面保存数据库的方法也进行了反复修改,从开始使用Spring的JdbcTemplate,换成后来直接使用最原始JDBC。 而且还踩了一个坑,开始的时候用的注入的JdbcTemplate, 本地运行没有问题,到了集群上面,发到别的机器的时候,注入的东西就是空的了。
换成原始的JDBC速度能提升不少, 我猜想这里的原因是jdbctemplate做了些多余的事情, JDBC打开一次,后面Invoke的时候就直接存了,效率要高些,所以速度上提升不少。
这里把部分代码贴出来, 在Open的时候就预加载好PreparedStatement, Invoke的时候直接传参数,调用就可以了。

public class SinkToMySQL2 extends RichSinkFunction<MarketPrice> {
    private PreparedStatement updatePS;
    private PreparedStatement insertPS;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        HikariDataSource dataSource = new HikariDataSource();
        connection = getConnection(dataSource);
        if(connection != null)
        {
            String updateSQL = " update MarketPrice set open_price=?,high_price=?,low_price=?,close_price=? where performance_id = ? and price_as_of_date = ?";
            updatePS = this.connection.prepareStatement(updateSQL);

            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
        if (updatePS != null) {
            updatePS.close();
        }
        if (insertPS != null) {
            insertPS.close();
        }
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }

    }

    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param marketPrice
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(MarketPrice marketPrice, Context context) throws Exception {

        log.info("start save for {}", marketPrice.getPerformanceId().toString() );

        updatePS.setDouble(1,marketPrice.getOpenPrice());
        updatePS.setDouble(2,marketPrice.getHighPrice());
        updatePS.setDouble(3,marketPrice.getLowPrice());
        updatePS.setDouble(4,marketPrice.getClosePrice());
        updatePS.setString(5, marketPrice.getPerformanceId().toString());
        updatePS.setInt(6, marketPrice.getPriceAsOfDate());
        int result = updatePS.executeUpdate();


        log.info("finish update for {} result {}", marketPrice.getPerformanceId().toString(), result);

        if(result == 0)
        {
            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
            insertPS.setString(1, marketPrice.getPerformanceId().toString());
            insertPS.setInt(2, marketPrice.getPriceAsOfDate());
            insertPS.setDouble(3,marketPrice.getOpenPrice());
            insertPS.setDouble(4,marketPrice.getHighPrice());
            insertPS.setDouble(5,marketPrice.getLowPrice());
            insertPS.setDouble(6,marketPrice.getClosePrice());

            result = insertPS.executeUpdate();
            log.info("finish save for {} result {}", marketPrice.getPerformanceId().toString(), result);
        }
    }

}

总结

从多个方面去改进,结果发现还是一样的,就是使用一台机器和使用三台机器,时间上一样的,再怀疑我只能怀疑是某台机器有问题,然后运行的时候,由最慢的机器决定了速度。 我在使用MapFunction的时候有观察到,有的时候,某台机器已经处理上千条,而有的只处理了几十条,到最后完成的时候,大家处理的数量又是很接近的。这样能够解释为什么机器多了,速度却是一样的。但是我没有办法找出哪台机器来。 我自己的本地运行,并行数设置的多,速度上面是有提升的,到了集群就碰到这样的现象,后面看能不能解决它, 先记录在此。文章来源地址https://www.toymoban.com/news/detail-414576.html

到了这里,关于记一次Flink遇到性能瓶颈的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 记一次Selenium框架的爬虫遇到下拉框页面的解决经历

    最近有一个项目需要使用爬虫从某网站抓取全国的医院名称,等级,地址等信息 爬取的url为https://some/website/that/i/can/tell/you/sorry 用浏览器打开这个url会发现,切换不同的省市需要点击左上角的下拉框进行选择 通常遇到这种下拉框页面,我们第一时间想到使用Selenium框架的Sel

    2024年01月21日
    浏览(27)
  • 记一次翻页性能优化

       由于是公司项目,所以不方便给出代码或者视频,只能列一些自己画的流程图。    大致情况如上,前端有7个显示区。在对其进行滚动翻页的时候,存在以下问题:    通过分析代码,调查log发现,翻页切换平均耗时在600ms。其主要的业务逻辑如下: 主要问题有

    2024年02月05日
    浏览(35)
  • 记一次模糊查询踩坑 Flink+ES

    公司需要对商品名称进行模糊模糊查询,考虑到商品表存量数据千万级,直接数据库模糊查询效率肯定极其低下,所以选择使用 ElasticSearch 对商品信息进行模糊查询。 因为需要代替原有的查询接口,保持原有查询接口的入参出参,所以需要全量+增量同步MySQL数据到ES进行索引

    2024年02月05日
    浏览(39)
  • 记一次Flink通过Kafka写入MySQL的过程

    一、前言 总体思路:source --transform --sink ,即从source获取相应的数据来源,然后进行数据转换,将数据从比较乱的格式,转换成我们需要的格式,转换处理后,然后进行sink功能,也就是将数据写入的相应的数据库DB中或者写入Hive的HDFS文件存储。 思路: pom部分放到最后面。 二

    2024年01月24日
    浏览(38)
  • 记一次rax应用用户体验性能优化

    对于前端开发攻城狮们来说,性能优化是一个永恒的话题。随着前端需求复杂度的不断升高,在项目中想始终保持着良好的性能也逐渐成为了一个有挑战的事情。本次分享简述我们在 Rax 项目中常用的一些性能优化方式,并将从近期的一个实际业务需求出发,讲述我在 Rax C端

    2024年02月21日
    浏览(36)
  • 记一次 JMeter 压测 HTTPS 性能问题

    在使用 JMeter 压测时,发现同一后端服务,在单机 500 并发下,HTTP 和 HTTPS 协议压测 RT 差距非常大。同时观测后端服务各监控指标水位都很低,因此怀疑性能瓶颈在 JMeter 施压客户端。 切入点:垃圾回收 首先在施压机观察到 CPU 使用率和内存使用率都很高,详细看下各线程

    2024年01月21日
    浏览(35)
  • 记一次SpringBoot应用性能调优过程

    使用SpringBoot、MyBatis-Plus开发一个接口转发的能,将第三方接口注册到平台中,由平台对外提供统一的地址,平台转发时记录接口的转发日志信息。开发完成后使用Jmeter进行性能测试,使用100个线程、持续压测180秒,测试结果如下,每秒仅支持8个并发。 服务器 作用 CPU核数 内

    2024年02月03日
    浏览(34)
  • 记一次卡顿的性能优化经历实操

    本篇的性能优化不是八股文类的优化方案,而是针对具体场景,具体分析,从排查卡顿根因到一步步寻找解决方案,甚至是规避等方案来最终解决性能问题的经历实操 所以,解决方案可能不通用,不适用于你的场景,但这个解决过程是如何一步步去处理的,解决思路是怎么样

    2024年02月02日
    浏览(28)
  • 测试2年遇到瓶颈,如何跨过这个坎,实现涨薪5k?

    最近和字节跳动的一个老朋友闲聊,感触颇深,据他说公司近期招聘的测试工程师,大多数候选人都有一个“通病”: 在工作2-3年的时候遇到瓶颈,而且是一道很难跨越的坎。 为什么会遇到这种情况?因为大部分测试工程师在工作了一段时间后,都可以完成最初的基本知识

    2023年04月26日
    浏览(63)
  • 性能分析5部曲:瓶颈分析与问题定位,如何快速解决瓶颈?

    一、引言 很多做性能测试的同学都问过我这样一个问题:鱼哥(Carl_奕然),你说性能测试的重点是什么? 我的回答很简单:瓶颈分析与问题定位。 在性能项目的整个周期,不管是脚本设计,脚本编写还是脚本执行,都还算简单。 难点在于如何定位瓶颈,分析瓶颈,解决瓶颈。

    2024年02月20日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包