Flink预加载分区维表,实时更新维表配置信息

这篇具有很好参考价值的文章主要介绍了Flink预加载分区维表,实时更新维表配置信息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

当前我们的业务场景,是基于dataStream代码, 维表数据量很大, 实时性要求很高,所以采用预加载分区维表模式, kafka广播流实时更新配置。

主题:调研预加载分区维表模式
业务特点: 维表配置数据量很大, 实时性要求很高

当前业务场景介绍:当前Flink基于dataStream代码编写, 每个并行度process的open方法加载全量配置数据保存
当前瓶颈点:无法应对超大维表。生产环境维表的配置数据量很大,如果每个并行度都去采用全量的配置会消耗很多内存,同时也会很耗时;有可能加载时间会超过checkpoint设置的timeout时间,导致整个Flink job都起不来,出现Down的情况。
预加载分区维表优点
一个并行度只加载自己那部分的配置,比如全量配置有1万个,Flnk job 20个并行度,采用分区维表平均下来1个并行度只要加载500个配置;大大减少了内存消耗,缩短了加载时间。

预加载分区维表实现方案
1:job初始化时 每个分区open 只加载自己那部分的配置,不用每个分区都全量加载
2: 自定义Partition分区,需要维护一个分区和配置之间的关联关系, 1,配置会到指定的分区,2.数据流来了要跑到指定的分区,要不然会匹配不到配置; 解决方案:使配置和数据流都有的唯一键做hashcode然后取余, (planId.hashcode% 20).abs ,保证planId会到指定的分区
3: 配置信息实时更新;用户在后台更新了某条配置,同时Flink内部也要实时更新。解决方案:后台将配置传入kafka topic;Flink job消费topic转换成广播流,使用process实时更新分区内的配置信息
4:配置信息要比数据流先到, 要不然会有部分数据流没有匹配到配置, 当前业内有两种解决方案, 第一种:采用processFunction内的open方法去加载所有配置,保证配置比数据流先到, 第二种:将比配置流先到的数据流缓存下来,比如放到ListState里存储下来,等配置流到了之后在去匹配; 当前方案采用的是第一种

衡量指标

总体来讲,关联维表有三个基础的方式:实时数据库查找关联(Per-Record Reference Data Lookup)、预加载维表关联(Pre-Loading of Reference Data)和维表变更日志关联(Reference Data Change Stream),而根据实现上的优化可以衍生出多种关联方式,且这些优化还可以灵活组合产生不同效果(不过为了简单性这里不讨论同时应用多种优化的实现方式)。对于不同的关联方式,我们可以从以下 7 个关键指标来衡量(每个指标的得分将以 1-5 五档来表示):

实现简单性: 设计是否足够简单,易于迭代和维护。

吞吐量: 性能是否足够好。

维表数据的实时性: 维度表的更新是否可以立刻对作业可见。

数据库的负载: 是否对外部数据库造成较大的负载(负载越低分越高)。

内存资源占用: 是否需要大量内存来缓存维表数据(内存占用越少分越高)。

可拓展性: 在更大规模的数据下会不会出现瓶颈。

结果确定性: 在数据延迟或者数据重放情况下,是否可以得到一致的结果。

启动预加载分区维表
对于维表比较大的情况,可以启动预加载维表基础之上增加分区功能。简单来说就是将数据流按字段进行分区,然后每个 Subtask 只需要加在对应分区范围的维表数据。值得注意的是,这里的分区方式并不是用 keyby 这种通用的 hash 分区,而是需要根据业务数据定制化分区策略,然后调用 DataStream#partitionCustom。比如按照 userId 等区间划分,0-999 划分到 subtask 1,1000-1999 划分到 subtask 2,以此类推。而在 open() 方法中,我们再根据 subtask 的 id 和总并行度来计算应该加载的维表数据范围。

Flink预加载分区维表,实时更新维表配置信息
启动预加载分区维表介绍:
通过这种分区方式,维表的大小上限理论上可以线性拓展,解决了维表大小受限于单个 TaskManager 内存的问题(现在是取决于所有 TaskManager 的内存总量),但同时给带来设计和维护分区策略的复杂性。

缓存方式
Flink预加载分区维表,实时更新维表配置信息
之前业务场景是采用的第一种, 但是配置数据量越来越大,已经不能支撑业务,所以模拟调研第三种方式,设计和维护分区策略

代码实验
Flink设置4个并行度, 2个taskmanager

-m yarn-cluster -p 4 -yjm 1024m -ytm 2048m -ynm $application_name -ys 2

Flink预加载分区维表,实时更新维表配置信息
Flink预加载分区维表,实时更新维表配置信息
采用自定义Partition设计和维护分区策略,数据流和维表connect

.filter(_.nonEmpty)
.map(_.get)
.partitionCustom(new CustomPartitioner(),data => {
    s"${data.datas.controlPlanId}"
})
.connect(indicatorConfigBroadcastStream)
.process(new FdcIndicatorConfigBroadcastProcessFunction)
.name("FdcGenerateIndicator")
.uid("FdcGenerateIndicator")

自定义Partition分区类

import org.apache.flink.api.common.functions.Partitioner
import org.slf4j.{Logger, LoggerFactory}


class CustomPartitioner extends Partitioner[String]{
  lazy private val logger: Logger = LoggerFactory.getLogger(classOf[CustomPartitioner])




  override def partition(key: String, numPartitions: Int): Int = {
    logger.warn("分区总数"+numPartitions)


    return (key.hashCode % numPartitions).abs
  }
}

BroadcastProcessFunction

class ConfigBroadcastProcessFunction
  extends BroadcastProcessFunction[fdcWindowData, JsonNode,
    (ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])] {


  lazy private val logger: Logger = LoggerFactory.getLogger(classOf[FdcIndicatorConfigBroadcastProcessFunction])

  // 初始化
  override def open(parameters: Configuration): Unit = {
    logger.warn(s"getIndexOfThisSubtask: ${getRuntimeContext.getIndexOfThisSubtask}")
    logger.warn(s"getNumberOfParallelSubtasks: ${getRuntimeContext.getNumberOfParallelSubtasks}")

    super.open(parameters)
    // 获取全局变量
    val p = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
    ProjectConfig.getConfig(p)
  }
  
  // 数据流
  override def processElement(windowData: fdcWindowData, ctx: BroadcastProcessFunction[fdcWindowData,
    JsonNode, (ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]#ReadOnlyContext,
                              out: Collector[(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]): Unit = {
      logger.warn(s"${getRuntimeContext.getIndexOfThisSubtask}")
  }
  // 广播流
  override def processBroadcastElement(value: JsonNode, ctx: BroadcastProcessFunction[
  fdcWindowData, JsonNode, (ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]#Context,
                                     out: Collector[(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]): Unit = {
 
  }
}

打印结果:
taskmanager1; open的时候打印信息
Flink预加载分区维表,实时更新维表配置信息
taskmanager2; open的时候打印信息
Flink预加载分区维表,实时更新维表配置信息
当数据流来时, processElement中的打印信息
Flink预加载分区维表,实时更新维表配置信息
参考:
https://blog.csdn.net/weixin_44904816/article/details/104305824
https://codeantenna.com/a/IcVVHYGUVi

https://www.jianshu.com/p/66b014dd2e36

https://blog.csdn.net/cloudbigdata/article/details/125013545文章来源地址https://www.toymoban.com/news/detail-485058.html

到了这里,关于Flink预加载分区维表,实时更新维表配置信息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • WebSocket 网络协议(实时更新 )

    WebSocket 是一种在客户端和服务器之间建立双向通信信道的网络协议。它在客户端和服务器之间建立一个持久的、全双工的连接,允许数据在两个方向上实时传输,而不需要像HTTP一样进行多次请求和响应。  WebSocket 的主要优势是减少了服务器和客户端之间的通信延迟,因为数

    2024年01月17日
    浏览(47)
  • Python如何随数据更新实时画图?Python实时动态绘图

    在数据分析和可视化场景中,我们常常需要实现实时动态图表,比如每分钟读取数据库新的记录,及时更新图表显示最新数据,而不是静态显示某个时间点的数据。本文将介绍使用Python matploblib库的animation功能实现实时动态绘图的方法。 问题背景 我们有一个Excel表,其中记录

    2024年02月12日
    浏览(49)
  • ClickHouse如何处理实时更新

    本文通过示例介绍如何处理ClickHouse实时更新。OLAP数据库并不欢迎数据变更操作,ClickHouse也不例外,和其他OLAP产品一样,刚开始ClickHouse甚至不支持更新,更新能力是后来才加上的,但是按照ClickHouse方式增加的。当前ClickHouse更新是异步的,使得在交互应用中难以使用。有很多

    2024年02月15日
    浏览(46)
  • CTF常用工具_实时更新

    近期在做一些ctf题,其中会涉及到许多工具,起初我会使用百度网盘在每一篇博客放置对应的工具,但因网盘上传有上限,所以现在我将练习中所用到所有的工具放置在这篇文章中。 需要下载的小伙伴可随时拿取,分享有效期为永久分享。 常用工具及常用网站为实时分享!

    2024年02月04日
    浏览(45)
  • 实时更新天气微信小程序开发

    1.新建一个天气 weather项目 2.在app.json中创建一个路由页面  当我们点击保存的时候,微信小程序会自动的帮我们创建好页面 3.在weather页面上书写我们的骨架  4.此时我们的页面很怪,因为没有给它添加样式和值。此时我们给它一个样式。(样式写在wxss中) 5.给它值,使用插值

    2024年02月01日
    浏览(93)
  • Java面试基础|数据结构 -实时更新

    1.HashMap和ConcurrentHashMap介绍 核心是一个Node数组, 数据结构与hashMap相似 使用CAS操作来实现无锁的更新,提高了并发性。当更新节点时,它会使用CAS来替换节点的值或链接,如果CAS失败,表明有其他线程也在进行修改,当前线程可以重试或锁定节点 对于复杂的结构修改操作

    2024年01月17日
    浏览(52)
  • vue父子组件传值不能实时更新

    最近做项目,遇到个大坑,这会爬出来了,写个总结,避免下次掉坑。 vue父子组件传值不能实时更新问题,父组件将值传给了子组件,但子组件显示的值还是原来的初始值,并没有实时更新,为什么会出现这种问题呢? 出现这个问题,可能有以下两个原因: 一、 父组件没有

    2024年02月16日
    浏览(52)
  • 数据大屏--->前端实时更新数据的几种方式

    优点:最大的优点就是实现简单 缺点:(1)无用的请求多,客户端不知道服务端什么时候数据更新,只能不停的向服务端发送请求, (2)数据实时性差:客户端还是需要一段时间(3s)才能拿到最新的数据 优点:解决了短轮询每隔几秒向服务端频繁发送请求的问题; 缺点:(1)服务端资源大量消

    2024年04月17日
    浏览(62)
  • cocos tilemap的setTileGIDAt方法不实时更新

    需要取消勾选 Enable Culling。同时代码添加:markForUpdateRenderData函数。 floor.setTileGIDAt(1024+27,newP.x,newP.y,0);    //中心 floor.markForUpdateRenderData(); 具体问题参考官网说明: Cocos Creator 3.2 手册 - 项目设置

    2024年02月07日
    浏览(43)
  • 小程序弹幕自动滚动实时更新数据功能

    需求 最近遇到的需求,写一个弹幕功能 大致就是实现这样的效果 弹幕轮播,上下两排,一共30个弹幕,30个轮播完毕之后获取新的弹幕数据 实现方法  目前我想到的实现方法是用css的动画来实现这个功能 布局层级 给barrageBox盒子一个相对定位 给barrageList绝对定位 先将list盒子

    2024年01月20日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包