flink作业 windowAll 转换window

这篇具有很好参考价值的文章主要介绍了flink作业 windowAll 转换window。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一. windowAll 和window介绍

datastream 流中没有使用keyby需要使用windowAll函数,使用了keyby的需要使用window函数

Keyed Windows
stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Non-Keyed Windows
stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

windowAll的函数: 并行度只能是1,性能不高
flink作业 windowAll 转换window,flink,flink,大数据
window的函数:并行度可以任意,性能高
flink作业 windowAll 转换window,flink,flink,大数据

二. 原flink服务存在问题

2.1 flink作业介绍

线上的flink作业的架构如下图所示:
flink作业 windowAll 转换window,flink,flink,大数据

1.先从rocketmq读取数据,通过windowAll类型的窗口进行10s的数据攒批;
2.攒批的数据经过聚合函数处理,该函数没有做任何操作,只是把数据传到下游,因为使用了windowAll所以此聚合函数并行度只能是1,考虑到该函数没有任何操作只是传递数据,所以不存在性能瓶颈,但是事实不是这样,后面会详细说明;
3.处理函数会调用远程的服务,如果符合要求的数据,就传到sink;
4.sink把数据存到doris数据库。
除了聚合函数外,所有函数都支持设置任意并行度

2.2 出现性能瓶颈

经过监控发现高峰期出现rocketmq消息积压
flink作业 windowAll 转换window,flink,flink,大数据
并且通过flinkui 发现处理函数存在被压,应该是远程服务慢导致,所以想通过增加并行度来提高处理速度
调整并行度:
除了聚合函数外,其他函数并行度都设置为2,通过savepoint恢复任务
不幸运的是报错了:
flink作业 windowAll 转换window,flink,flink,大数据
方法位于:org.apache.flink.runtime.checkpoint.StateAssignmentOperation#checkParallelismPreconditions(org.apache.flink.runtime.checkpoint.OperatorState,org.apache.flink.runtime.executiongraph.ExecutionJobVertex)

探究原因 看报错信息得知,通过savepoint恢复数据时,原算子的最大并行度是1,但是设置了2个并行度,所以恢复失败,聚合算子增加并行度,其他算子为什么最大并行度会是1呢?

跟flink算子链合并有关:
算子链合并介绍:没有keyby操作和并行度相同,算子就会合并在一起,目的是减少数据的网络传输和序列化

flink作业 windowAll 转换window,flink,flink,大数据
如上图所示:聚合函数,处理函数,sink由于并行度相同,所以算子合并了,savepoint时,会取三个函数中最大并行度中最小的一个作为恢复时的最大的并行度;所以最大并行度是1,导致设置了并行度为2就恢复不了了。

解决办法
1.不通过savepoint恢复:
启动时就把除了聚合函数外,其他算子并行度都设置为2,由于并行度不同,所以聚合函数无法和其他算子进行算子合并,就不会发生上面问题。
但是此方法无法通过savepoint启动,解决办法是rocketmq的source可以设置offset或者timestap指定从那个位点开始消费,mysqlsource可以也通过设置binlog的offset指定从那个位点开始消费,达到从savepoint恢复的效果
flink作业 windowAll 转换window,flink,flink,大数据2.先关闭算子链合并操作,在增加并发度:先通过savepoint恢复时把算子链合并关闭,由于关闭了算子链合并,所以算子没有合并再一起,再次进行savepoint时,除了聚合函数,其他算子最大并行度就是128,后面就可以增加并行度,然后通过savepoint来恢复作业了。那调整了并行度可以再次合并算子链吗?答案是不可以的,报错信息如下:flink作业 windowAll 转换window,flink,flink,大数据
对应代码:org.apache.flink.runtime.checkpoint.Checkpoints#loadAndValidateCheckpoint
就是说savepoint的最大并行度是128,但是新的程序最大并行度为1,因为经过了算子链合并,所以会报这个错。
此方法就会导致没办法使用算子链合并的优化,会导致序列化和网络传输增加,影响性能

三. 转换后存在问题

由于上面2个方法都存在一些问题,还有没有别的方法解决?
想法:把windowAll 改成Window ,这样聚合函数就得摆脱并行度为1的限制了
改完之后的图如下
flink作业 windowAll 转换window,flink,flink,大数据
source算子先经过keyby,在进行window操作,聚合函数是继承了ProcessWindowFunction的,这样聚合函数就可以设置任意并行度了。
这样可以通过savepoint来恢复启动吗?
很遗憾,启动后还是报错了,报错如下:
flink作业 windowAll 转换window,flink,flink,大数据
方法代码: org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation#readMetaData
意思是说之前key的序列化和现在key的序列化不兼容导致,所以也没有办法通过savepoint方式恢复,只能启动的设置通过设置source的偏移量或者的从那个时间戳开始消费,这样来达到类似从savepoint的方式恢复启动。

四. 总结

windowall窗口使用注意事项:
1.当并行度设置为1时,会进行算子链合并,如果增加并行度,通过savepoint恢复启动时就会受到windowall的函数影响导致不允许增加并行度,启动失败;
2.需要设置并行度时,在算子链合并的情况下不能通过savepoint恢复启动,只能通过指定从哪个位点开始消费这样方式来启动;或者关闭算子链优化,但是会减低性能;
3.windowall转window时,也不能通过savepoint启动,也只能指定从哪个位点开始消费来启动,这种方式的好处在于每个算子都可以设置任意并行度。文章来源地址https://www.toymoban.com/news/detail-802800.html

到了这里,关于flink作业 windowAll 转换window的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(57)
  • Flink流批一体计算(3):FLink作业调度

    架构 所有的分布式计算引擎都需要有集群的资源管理器,例如:可以把MapReduce、Spark程序运行在YARN集群中、或者是Mesos中。Flink也是一个分布式计算引擎,要运行Flink程序,也需要一个资源管理器。而学习每一种分布式计算引擎,首先需要搞清楚的就是:我们开发的分布式应用

    2024年02月10日
    浏览(46)
  • 大数据Flink(七十七):SQL窗口的Over Windows

    文章目录 SQL窗口的Over Windows 一、​​​​​​​时间区间聚合

    2024年02月09日
    浏览(39)
  • flink作业提交流程

    目录 作业提交流程 独立模式 YARN模式 会话模式 单作业模式 应用模式 (1) 一般情况下,由客户端(App)通过分发器提供的 REST 接口,将作业提交给JobManager。 (2)由分发器启动 JobMaster,并将作业(包含 JobGraph)提交给 JobMaster。 (3)JobMaster 将 JobGraph 解析为可执行的 Exec

    2024年02月12日
    浏览(41)
  • 记一次 Flink 作业启动缓慢

    应用发现,Hadoop集群的hdfs较之前更加缓慢,且离线ELT任务也以前晚半个多小时才能跑完。此前一直没有找到突破口所以没有管他,推测应该重启一下Hadoop集群就可以了。今天突然要重启一个Flink作业,发现有一个过程卡了五分钟。 由上图可知09:36到09:41这两个过程中间花了五

    2024年02月21日
    浏览(46)
  • Flink作业调度的9种状态

    Flink 通过 Task Slots 来定义执行资源。每个 TaskManager 有一到多个 task slot,每个 task slot 可以运行一条由多个并行 task 组成的流水线。 这样一条流水线由多个连续的 task 组成,比如并行度为 n 的 MapFunction 和 并行度为 n 的 ReduceFunction。 例如:一个由数据源、MapFunction 和 ReduceFun

    2024年02月13日
    浏览(34)
  • 【flink】使用flink-web-ui提交作业报错

    使用WebUI提交作业出现错误。 错误截图:  弹框信息: 在弹框中是无法看到具体错误信息的。 需要去 job-manager/logs中看详细信息: Failed to create checkpoint storage at checkpoint coordinator side 无法在检查点协调器端创建检查点存储  怎么还没有办法创建呢???? 看一下我的StateBa

    2024年02月14日
    浏览(51)
  • 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(3)- 数据倾斜处理、分区示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(55)
  • Flink|《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》 学习笔记如下: 当前,用户可以通过 CLI 提交 PyFlink 作业。对于通过 flink run 提交的 Python 作业,Flink 会执行 python 命令。因此,在启动 Python 作业前,需要先确定当前环境中的 python 命令指向 3.7+ 版本的 Pyt

    2024年02月22日
    浏览(63)
  • 如何处理 Flink 作业频繁重启问题?

    Flink 实现了多种重启策略 固定延迟重启策略(Fixed Delay Restart Strategy) 故障率重启策略(Failure Rate Restart Strategy) 没有重启策略(No Restart Strategy) Fallback重启策略(Fallback Restart Strategy) Flink支持不同的重启策略,以在故障发生时控制作业如何重启 默认的重启策略:如果没有

    2024年02月10日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包