Flink实时电商数仓(十)

这篇具有很好参考价值的文章主要介绍了Flink实时电商数仓(十)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

common模块回顾

  1. app
    • BaseApp: 作为其他子模块中使用Flink - StreamAPI的父类,实现了StreamAPI中的通用逻辑,在其他子模块中只需编写关于数据处理的核心逻辑。
    • BaseSQLApp: 作为其他子模块中使用Flink- SQLAPI的父类。在里面设置了使用SQL API的环境、并行度、检查点等固定逻辑。
  2. bean:存放其他子模块中使用到的javaBean对象,因为如果一直使用jsonObject对象调用数据的话,需要使用类似getString("字段名")的方式,没有直接使用javaBean对象那么方便。
  3. constant
    • 存储字符串常量
    • 为了保证一致性,如果某个常量修改时,只需在这里修改即可对整个项目进行修改
  4. function
    • DorisMapFunction:将javaBean对象转换为对应的json字符串对象,并且将驼峰式命名方式修改为蛇形命名方式。便于写入doris。
  5. util
    • DateFormateUtil
    • FlinkSinkUtil
    • FlinkSourceUtil
    • HBaseUtil
    • IkUtil
    • JdbcUtil
    • SQLUtil
      • getUpsertKafakaSQL: 一定要声明主键,支持撤回流
      • getDorisSinkSQL: 用于写入Doris

dim层回顾

  • Flink-cdc监控mysql中的维度配置表
  • 将监控的数据流做成广播流
  • 将广播流和读取数据的主流进行connect
  • 主流数据根据广播流的配置信息进行分流,注意需要先提前缓存一次配置表信息
  • 达到动态拆分数据表的效果

dwd层FlinkSQL回顾

  • 注意join时会将所有数据都存储到内存中,需要考虑设置TTL
  • 大表join小表时,可以考虑使用lookup join
  • 如果数据流有明确的先后关系时,考虑使用Interval join

在支付成功模块,由于订单详情表处理时已经存在撤回流,但支付成功模块也是使用left join方式调用订单详情数据,会导致产生两次撤回流。在后续dws层处理时,要注意对数据进行去重过滤。

dws层回顾

  • 如何判断使用FlinkSQL还是StreamAPI
    • 如果比较标准化, 比如简单的开窗聚合,一般使用FlinkSQL
    • 如果需要使用状态处理数据,比如判断是否为独立用户,使用StreamAPI

交易域sku粒度订单下单各窗口汇总

  • 需求分析:从Kafka订单明细主题读取数据,过滤null数据并按照唯一键对数据去重,按照SKU维度分组,统计原始金额、活动减免金额、优惠券减免金额和订单金额,并关联维度信息,将数据写入Doris交易域SKU粒度下单各窗口汇总表

  • 思路分析:

    • 方案一:按照订单ID进行分组,根据业务逻辑设置定时器取最后一个数据进行发送
    • 方案二:将度量值存放到状态中,每次新数据到达时,将新的度量值减去状态中的度量值
  • 具体实现文章来源地址https://www.toymoban.com/news/detail-773574.html

    • 因为需要使用状态,故使用BaseApp; 设置端口号10029,并发度4,消费者组为类名,消费者主题名称为dwd订单详情
    • 读取dwd下单主题数据, stream.print()
    • 过滤清洗:
      • 去掉null数据, stream.flatMap(new FlatMapFunction<>())
      • ts: 水位线,不能为空;进行位数的修正,如果是10位的,使用 jsonObj.put("ts", ts*1000)
      • id: keyby的关键字,不能为空
      • sku_id: group by的粒度关键字,也不能为空
    • 添加水位线
      • 网络延迟5L
      • 添加数据的泛型,提取数据中的ts,作为水位线(注意观察ts的位数,需要为13位,毫秒级)
    • 修正度量值,转换数据结构
      • 使用id关键字进行分组
      • 使用process算子中的状态来进行处理stream.process(new KeyedProcessFunction<>),返回值为对应的javabean对象
      • 在状态中存储上一次的度量值大小,只保存30秒
      • processElement()方法中获取状态中的度量值,使用前需要判空,如果为空设置为0,之后才能进行数值计算。
      • 创建对应的bean对象,度量值都减去状态中的度量值和更新状态中当前的度量值
    • 分组开窗聚合
      • 使用skuId进行keyby
      • 分组后使用window算子进行开窗,设置窗口时间,注意Time属于org.apache.flink.streaming.api.windowing.time.Time.seconds()
      • 使用reduce算子进行聚合计算, 聚合时需要累积所有度量值
      • new ProcessWindowFunction()获取窗口信息, startTime, EndTime, curTime, 获取到后写入javaBean对象中
    • 关联维度信息
      • 先分组聚合再关联维度信息的原因:关联维度信息需要join操作,是很耗费性能的大操作。先聚合数据能大幅度减少数据量。
      • 启动HBase,查看对的sku_info表中是否存储着对应的维度信息
      • 获取外部连接,需要使用生命周期方法(open,close在整个算子执行过程中只运行一次);对应的关联维度信息,即RichMapFunction()
      • map方法中使用HBase的API读取表格数据,使用读取到的字段补全原本的信息
    • 创建HBase的API:读取表格数据 get
      • 获取table
      • 创建get对象
      • 调用get方法
      • 获取数据写入jsonObj
    • 写出到Doris

维度关联优化

  1. 旁路缓存:独立缓存服务有(redis, memcache).
    Flink实时电商数仓(十),flink,大数据
  • 使用旁路缓存时要注意保持数据的一致性,如果数据发生修改和删除,直接删除redis中的数据。

同步+旁路缓存模式

  1. 引入Jedis相关依赖
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>
  1. 创建Redis工具类RedisUtil
  2. 在RichMapFunction中的open和close方法中获取和关闭HBase和Redisd的连接。
  3. 拼接对应的redisRowKey
  4. 读取Redis缓存的数据,jsonObj的字符串
  5. 判断redis读取到的数据是否为空
    • 没有数据:需要读取HBase;jsonObj = HBaseUtil.getCells(), 读取到数据后,使用jedis.setex()存储到redis
    • redis有缓存,直接返回
  6. 进行维度关联

Dim层写入HBase修正

  • 在dim层将数据写入HBase时,需要同时获取Redis的连接。
  • 判断redis中的缓存是否发生变化
    • 判断数据类型是修改或删除时,删除Redis中对应的数据
    • 拼写数据的rowkey
    • 使用jedis.del(rediskey)来删除

到了这里,关于Flink实时电商数仓(十)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink 实时数仓 (一) --------- 数据采集层

    1. 普通实时计算与实时数仓比较 普通的实时计算优先考虑时效性,所以从数据源采集经过实时计算直接得到结果。如此做时效性更好,但是弊端是由于计算过程中的中间结果没有沉淀下来,所以当面对大量实时需求的时候,计算的复用性较差,开发成本随着需求增加直线上升

    2024年02月06日
    浏览(47)
  • Flink+Doris 实时数仓

    Doris基本原理 Doris基本架构非常简单,只有FE(Frontend)、BE(Backend)两种角色,不依赖任何外部组件,对部署和运维非常友好。架构图如下 可以 看到Doris 的数仓架构十分简洁,不依赖 Hadoop 生态组件,构建及运维成本较低。 FE(Frontend)以 Java 语言为主,主要功能职责: 接收用户

    2024年02月07日
    浏览(51)
  • Flink CDC和Flink SQL构建实时数仓Flink写入Doris

    软件环境 Flink1.13.3 Scala 2.12 doris 0.14 一、MySQL 开启binlog日志、创建用户 1.开启bin log MySQL 8.0默认开启了binlog,可以通过代码show variables like \\\"%log_bin%\\\";查询是否开启了,show variables like \\\"%server_id%\\\";查询服务器ID。 上图分别显示了bin long是否开启以及bin log所在的位置。 2.创建用户 C

    2024年02月02日
    浏览(78)
  • 实时数仓|基于Flink1.11的SQL构建实时数仓探索实践

    实时数仓主要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的 OLAP 分析、实时的数据看板、业务指标实时监控等场景。虽然关于实时数仓的架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。本文会分享基于 Flink

    2024年02月16日
    浏览(48)
  • Flink实时数仓同步:拉链表实战详解

    在大数据领域,业务数据通常最初存储在关系型数据库,例如MySQL。然而,为了满足日常分析和报表等需求,大数据平台会采用多种不同的存储方式来容纳这些业务数据。这些存储方式包括离线仓库、实时仓库等,根据不同的业务需求和数据特性进行选择。 举例来说,假设业

    2024年01月20日
    浏览(56)
  • flink 实时数仓构建与开发[记录一些坑]

    1、业务库使用pg数据库, 业务数据可以改动任意时间段数据 2、监听采集业务库数据,实时捕捉业务库数据变更,同时实时变更目标表和报表数据 实时数据流图与分层设计说明 1、debezium采集pg库表数据同步到kafka 【kafka模式】 2、flink 消费kafka写入pg或kafka 【upset-kafka,新版k

    2024年02月16日
    浏览(40)
  • Flink实时数仓之用户埋点系统(一)

    用户行为采集 行为数据:页面浏览、点击、在线日志等数据 活跃数据:用户注册、卸载安装、活跃等数据 App性能日志:卡顿、异常等数据 业务数据采集 业务数据:支付等 维度表:渠道、商品等 用户行为日志 日志结构大致可分为两类,一是页面日志,二是启动日志和在线

    2024年04月11日
    浏览(44)
  • GaussDB(DWS)基于Flink的实时数仓构建

    本文分享自华为云社区《GaussDB(DWS)基于Flink的实时数仓构建》,作者:胡辣汤。 大数据时代,厂商对实时数据分析的诉求越来越强烈,数据分析时效从T+1时效趋向于T+0时效,为了给客户提供极速分析查询能力,华为云数仓GaussDB(DWS)基于流处理框架Flink实现了实时数仓构建。在

    2024年04月22日
    浏览(43)
  • 美团买菜基于 Flink 的实时数仓建设

    美团买菜是美团自营生鲜零售平台,上面所有的商品都由美团亲自采购,并通过供应链物流体系,运输到距离用户 3km 范围内的服务站。用户从美团买菜平台下单后,商品会从服务站送到用户手中,最快 30 分钟内。 上图中,左侧的时间轴展示了美团买菜的发展历程,右侧展示

    2024年02月09日
    浏览(45)
  • 曹操出行基于 Hologres+Flink 的实时数仓建设

    曹操出行 创立于2015年5月21日,是吉利控股集团布局“新能源汽车共享生态”的战略性投资业务,以“科技重塑绿色共享出行”为使命,将全球领先的互联网、车联网、自动驾驶技术以及新能源科技,创新应用于共享出行领域,以“用心服务国民出行”为品牌主张,致力于打

    2024年01月20日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包