Flink 优化(六) --------- FlinkSQL 调优

这篇具有很好参考价值的文章主要介绍了Flink 优化(六) --------- FlinkSQL 调优。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


FlinkSQL 官网配置参数:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html

一、设置空闲状态保留时间

Flink SQL 新手有可能犯的错误,其中之一就是忘记设置空闲状态保留时间导致状态爆炸。列举两个场景:

➢ FlinkSQL 的 regular join(inner、left、right),左右表的数据都会一直保存在状态里,不会清理!要么设置 TTL,要么使用 FlinkSQL 的 interval join。

➢ 使用 Top-N 语法进行去重,重复数据的出现一般都位于特定区间内 (例如一小时或一天内),过了这段时间之后,对应的状态就不再需要了。

Flink SQL 可以指定空闲状态(即未更新的状态)被保留的最小时间,当状态中某个 key 对应的状态未更新的时间达到阈值时,该条状态被自动清理:

#API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
#参数指定
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "1 h");

二、开启 MiniBatch

MiniBatch 是微批处理,原理是缓存一定的数据后再触发处理,以减少对 State 的访问,从而提升吞吐并减少数据的输出量。MiniBatch 主要依靠在每个 Task 上注册的 Timer 线程来触发微批,需要消耗一定的线程调度性能。

➢ MiniBatch 默认关闭,开启方式如下:

// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");

➢ 适用场景

微批处理通过增加延迟换取高吞吐,如果有超低延迟的要求,不建议开启微批处理。通常对于聚合的场景,微批处理可以显著的提升系统性能,建议开启。

flink sql 参数,Flink,flink,java,jvm
➢ 注意事项:

1)目前,key-value 配置项仅被 Blink planner 支持。
2)1.12 之前的版本有 bug,开启 miniBatch,不会清理过期状态,也就是说如果设置状态的 TTL,无法清理过期状态。1.12 版本才修复这个问题。

参考 ISSUE:https://issues.apache.org/jira/browse/FLINK-17096

三、开启 LocalGlobal

原理概述

LocalGlobal 优化将原先的 Aggregate 分成 Local+Global 两阶段聚合 , 即 MapReduce 模型中的 Combine+Reduce 处理模式。第一阶段在上游节点本地攒一批数据进行聚合 (localAgg) ,并输出这次微批的增量值 (Accumulator)。第二阶段再将收到的 Accumulator 合并 (Merge) ,得到最终的结果 (GlobalAgg) 。

LocalGlobal 本质上能够靠 LocalAgg 的聚合筛除部分倾斜数据,从而降低 GlobalAgg的热点,提升性能。结合下图理解 LocalGlobal 如何解决数据倾斜的问题。

flink sql 参数,Flink,flink,java,jvm
由上图可知:

  • 未开启 LocalGlobal 优化,由于流中的数据倾斜,Key 为红色的聚合算子实例需要处理更多的记录,这就导致了热点问题。
  • 开启 LocalGlobal 优化后,先进行本地聚合,再进行全局聚合。可大大减少 GlobalAgg 的热点,提高性能。

➢ LocalGlobal 开启方式:

1)LocalGlobal 优化需要先开启 MiniBatch,依赖于 MiniBatch 的参数。
2)table.optimizer.agg-phase-strategy: 聚合策略。默认 AUTO,支持参数 AUTO、TWO_PHASE(使用 LocalGlobal 两阶段聚合)、ONE_PHASE(仅使用 Global 一阶段聚合)。

// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");

// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

➢ 注意事项:
1)需要先开启 MiniBatch
2)开启 LocalGlobal 需要 UDAF 实现 Merge 方法。

四、开启 Split Distinct

LocalGlobal 优化针对普通聚合 (例如 SUM、COUNT、MAX、MIN 和 AVG ) 有较好的效果,对于 DISTINCT 的聚合 (如 COUNT DISTINCT) 收效不明显,因为 COUNT DISTINCT 在 Local 聚合时,对于 DISTINCT KEY 的去重率不高,导致在 Global 节点仍然存在热点。

原理概述

之前,为了解决 COUNT DISTINCT 的热点问题,通常需要手动改写为两层聚合 (增加按 Distinct Key 取模的打散层 )。

从 Flink1.9.0 版本开始,提供了 COUNT DISTINCT 自动打散功能 , 通过HASH_CODE(distinct_key) % BUCKET_NUM 打散,不需要手动重写。Split Distinct 和
LocalGlobal 的原理对比参见下图。

flink sql 参数,Flink,flink,java,jvm
Distinct 举例:

SELECT a, COUNT(DISTINCT b)
FROM T
GROUP BY a

手动打散举例:

SELECT a, SUM(cnt)
FROM (
 SELECT a, COUNT(DISTINCT b) as cnt
 FROM T
 GROUP BY a, MOD(HASH_CODE(b), 1024)
)
GROUP BY a

➢ Split Distinct 开启方式

默认不开启,使用参数显式开启:

  • table.optimizer.distinct-agg.split.enabled: true,默认 false。
  • table.optimizer.distinct-agg.split.bucket-num: Split Distinct 优化在第一层聚合中,被打散的 bucket 数目。默认 1024。
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:(要结合 minibatch 一起使用)
// 开启 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");

➢ 注意事项:

(1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 优化方法。
(2)拆分出来的两个 GROUP 聚合还可参与 LocalGlobal 优化。
(3)该功能在 Flink1.9.0 版本及以上版本才支持。

五、多维 DISTINCT 使用 Filter

原理概述

在某些场景下,可能需要从不同维度来统计 count(distinct)的结果(比如统计 uv、app 端的 uv、web 端的 uv),可能会使用如下 CASE WHEN 语法。

SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB_b,
COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a

在这种情况下,建议使用 FILTER 语法, 目前的 Flink SQL 优化器可以识别同一唯一键上的不同 FILTER 参数。如,在上面的示例中,三个 COUNT DISTINCT 都作用在 b 列上。

此时,经过优化器识别后,Flink 可以只使用一个共享状态实例,而不是三个状态实例,可减少状态的大小和对状态的访问。

将上边的 CASE WHEN 替换成 FILTER 后,如下所示:

SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('A', 'B')) AS AB_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('C', 'D')) AS CD_b
FROM T
GROUP BY a

六、设置参数总结

总结以上的调优参数,代码如下:文章来源地址https://www.toymoban.com/news/detail-621400.html

// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 开启 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// 指定时区
configuration.setString("table.local-time-zone", "Asia/Shanghai");

到了这里,关于Flink 优化(六) --------- FlinkSQL 调优的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【业务功能篇86】微服务-springcloud-系统性能压力测试-jmeter-性能优化-JVM参数调优

      压力测试是给软件不断加压,强制其在极限的情况下运行,观察它可以运行到何种程度,从而发现性能缺陷,是通过搭建与实际环境相似的测试环境,通过测试程序在同一时间内或某一段时间内,向系统发送预期数量的交易请求、测试系统在不同压力情况下的效率状况,

    2024年02月10日
    浏览(39)
  • Flink 学习十 FlinkSQL

    flink sql 基于flink core ,使用sql 语义方便快捷的进行结构化数据处理的上层库; 类似理解sparksql 和sparkcore , hive和mapreduce 1.1 工作流程 整体架构和工作流程 数据流,绑定元数据 schema ,注册成catalog 中的表 table / view 用户使用table Api / table sql 来表达计算逻辑 table-planner利用 apache calci

    2024年02月10日
    浏览(33)
  • 【业务功能篇86】微服务-springcloud-系统性能压力测试-jmeter-性能优化-JVM参数调优-Nginx实现动静分离

      压力测试是给软件不断加压,强制其在极限的情况下运行,观察它可以运行到何种程度,从而发现性能缺陷,是通过搭建与实际环境相似的测试环境,通过测试程序在同一时间内或某一段时间内,向系统发送预期数量的交易请求、测试系统在不同压力情况下的效率状况,

    2024年02月07日
    浏览(50)
  • Flink:FlinkSql解析嵌套Json

    日常开发中都是用的简便json格式,但是偶尔也会遇到嵌套json的时候,因此在用flinksql的时候就有点麻烦,下面用简单例子简单定义处理下 1,数据是网上摘抄,但包含里常用的大部分格式 {     \\\"afterColumns\\\": {         \\\"created\\\": \\\"1589186680\\\",         \\\"extra\\\": {             \\\"

    2023年04月09日
    浏览(22)
  • Flink实战-(6)FlinkSQL实现CDC

    FlinkSQL说明 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初

    2023年04月26日
    浏览(39)
  • flink学习35:flinkSQL查询mysql

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions} object sqlQueryTable {   def main(args: Array[String]): Unit = {     //create env     val env = StreamExecutionEnvironment.getExecutionEnv

    2023年04月23日
    浏览(37)
  • JVM调优篇:探索Java性能优化的必备种子面试题

    首先面试官会询问你在进行JVM调优之前,是否了解JVM内存模型的基础知识。这是一个重要的入门问题。JVM内存模型主要包括程序计数器、堆、本地方法栈、Java栈和方法区(1.7之后更改为元空间,并直接使用系统内存)。 正常堆内存又分为年轻代和老年代。在Java虚拟机中,年

    2024年02月15日
    浏览(42)
  • 【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置

    SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。本章介绍如何使用 SQL 提示来实现各种干预。 SQL 提示一般可以用于以下: 增强 planner:没有完美的 planner, SQL 提示让用户更好地控制执行; 增加元数据(或者统计信息):如\\\"已扫描的表索引\\\"和\\\"一些混洗键(shu

    2024年04月25日
    浏览(22)
  • 【Flink系列七】TableAPI和FlinkSQL初体验

    Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。  Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理

    2024年02月03日
    浏览(28)
  • Java线上故障排查(CPU、磁盘、内存、网络、GC)+JVM性能调优监控工具+JVM常用参数和命令

    根据服务部署和项目架构,从如下几个方面排查: (1)运用服务器:排查内存,cpu,请求数等; (2)文件图片服务器:排查内存,cpu,请求数等; (3)计时器服务器:排查内存,cpu,请求数等; (4)redis服务器:排查内存,cpu,连接数等; (5)db服务器:排查内存,cpu,连接数

    2024年02月07日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包