Flink电商实时数仓(三)

这篇具有很好参考价值的文章主要介绍了Flink电商实时数仓(三)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

DIM层代码流程图

维度层的重点和难点在于实时电商数仓需要的维度信息一般是动态的变化的,并且由于实时数仓一般需要一直运行,无法使用常规的配置文件重启加载方式来修改需要读取的ODS层数据,因此需要通过Flink-cdc实时监控MySql中的维度数据配置信息表,实时动态的发布广播信息。主流数据根据广播数据及时调整处理逻辑,并自动在HBase中创建相应的维度表和写入相应的维度数据。

  1. 消费Kafka ods业务主题数据
  2. 数据清洗:是否为JSON格式
  3. 使用flink-cdc读取监控配置表数据
  4. 在HBase中创建维度表
  5. 做成广播流
  6. 连接主流和广播流
  7. 筛选出需要写出的字段
  8. 写出到Hbase

Flink电商实时数仓(三),flink,linq,大数据

整体架构

  • realtime-common模块
    • base: 所有Flink程序的基类,负责搭建Flink运行环境和设置并行度和检查点等相关参数。其中我们的数据来源也确定为Kafka,故数据源代码也写在这里。每个Flink程序的具体处理逻辑由handle()函数来负责处理。
    • bean:负责存放项目运行过程中需要用到的bean对象,比如当前flink-cdc程序中需要用到的TableProcessDim类,配置信息表对象。
    • constant:负责存放程序中需要使用到常量参数
    • function:负责存放一些通用的函数方法
    • util:一般存放和数据连接相关的工具类
    • test目录: 用来在写正式代码前测试连接是否通畅,数据是否可以正常发送。
  • realtime-dim模块
    • app:DimApp里面写的是dim层的具体实现,具体步骤如上述流程图所示。
    • function:负责存放数据处理的实现类,一般会继承相应的父类,在dim层可以直接调用这里的子类来实现父类接口,让dim层的代码逻辑更加清晰。
  • realtime-dwd模块:如上
  • realtime-dws模块:如上

Flink电商实时数仓(三),flink,linq,大数据

数据清洗ETL

数据清洗,简单来说就是对数据进行简单的转换筛选。首先如果在转换过程中出现异常,直接过滤掉。注意这里无需抛出异常,因为如果throw a exception会导致整个程序异常终止,而在数据处理过程中出现部分数据格式错误而无法正常进行格式转换是很常见的,只需将异常信息打印到控制台即可。如果转换正常,再判断是否满足以下三个条件:

  1. 数据库名为gmall
  2. 数据类型不是bootstrap-start或者bootstrap-complete
  3. data字段不是null且长度不为0

Flink-cdc读取配置表的数据

Flink中获取数据主要有两个步骤:

  1. 获取相应的数据源Source
    • 注意:在构建Flink-cdc对应的MySQLSource时,tableList参数必须是库表.表名结构
  2. 调用env.fromSource()方法将数据源的发送过来的数据转换Ds数据流,在该方法中可以设置数据的水位线。
  3. 获取到数据后,建议先打印到控制台查看数据的具体结构。
  4. 注意读取配置信息表的并发度必须设置为1;如果不为1,只能读取r操作数据,其他更新数据无法读取。
public static MySqlSource<String> getMySqlSource(String databaseName, String tableName){
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(Constant.MYSQL_HOST)
                .port(Constant.MYSQL_PORT)
                .username(Constant.MYSQL_USER_NAME)
                .password(Constant.MYSQL_PASSWORD)
                .databaseList(databaseName) // set captured database
                .tableList(databaseName+"."+tableName) // set captured table

                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .startupOptions(StartupOptions.initial())
                .build();

        return mySqlSource;
    }

在HBase中创建维度表

数据库中的配置表数据经过Flink-cdc处理后发送到这里是json格式的字符串,这里根据数据的四种类型op在HBase中进行不同的建表删表操作,同时对数json字符数据进行转换映射处理,转换为对应的bean对象数据流。这里一个数据产生一个处理后的对象,故使用Map算子或FlatMap算子都可以。

  • op类型
    • d 代表delete,需要删除before字段中对应的表
    • c 代表create,r 代表 read,需要创建after字段中对应的表
    • u 代表update,需要先删除掉旧表,然后根据新表的字段创建一个新表
  • 创建HBase连接,创建连接是很耗费资源的行为,因此新建连接和关闭连接需要写在open和close方法中
  • HBase中想要对表进行创建和删除等DDL操作,都由Admin对象管理;如果需要对数据进行插入删除等DML操作,需要创建Table对象。详细操作细节请看相应代码即可。
public static SingleOutputStreamOperator<TableProcessDim> createHbaseTable(DataStreamSource<String> mysqlSource) {
        SingleOutputStreamOperator<TableProcessDim> createHBaseTable = mysqlSource.flatMap(
                new RichFlatMapFunction<String, TableProcessDim>() {


                    public Connection connection ;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //获取连接
                        connection = HBaseUtil.getHBaseConnection();
                    }

                    @Override
                    public void close() throws Exception {
                        //关闭连接
                        HBaseUtil.closeHBaseConn(connection);
                    }

                    @Override
                    public void flatMap(String s, Collector<TableProcessDim> out){
                        //使用读取的配置表数据,到HBase中创建与之对应的表格

                        try {
                            JSONObject jsonObject = JSONObject.parseObject(s);
                            String op = jsonObject.getString("op");
                            TableProcessDim dim;//维度表
                            if ("d".equals(op)) {
                                dim = jsonObject.getObject("before", TableProcessDim.class);
                                dim.setOp(op);
                                //当配置表发送一个D类型的数据,对应的HBase需要删除一张维度表
                                deleteTable(dim);
                            } else if ("c".equals(op) || "r".equals(op)) {
                                dim = jsonObject.getObject("after", TableProcessDim.class);
                                createTable(dim);
                                dim.setOp(op);
                            } else {//op = 'u', 即修改
                                dim = jsonObject.getObject("after", TableProcessDim.class);
                                deleteTable(dim);
                                createTable(dim);
                            }
                            dim.setOp(op);
                            out.collect(dim);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                    private void createTable(TableProcessDim dim) {
                        String sinkFamily = dim.getSinkFamily();
                        String[] split = sinkFamily.split(",");
                        try {
                            HBaseUtil.createHBaseTable(connection,Constant.HBASE_NAMESPACE,dim.getSinkTable(),split);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }

                    private void deleteTable(TableProcessDim dim) {
                        try {
                            HBaseUtil.dropHBaseTable(connection, Constant.HBASE_NAMESPACE, dim.getSinkTable());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }
                }
        );
        return createHBaseTable;
    }

主流连接广播流

从Flink-cdc获取的数据(gmall2023_config)是作为一个参数来控制我们对于主流即ODS层数据(gmall数据库的业务数据)的处理逻辑。gmall2023)_config库中的Table_process_dim表决定了后续程序筛选哪个表作为维度信息,并且定义了表中有哪些字段。

  1. 转换为广播流只需要调用上述得到的TableProcessDimStream的broadcast方法
  2. 使用的主流(gmall业务数据)的connect方法,得到一个连接流,然后对连接流进行process处理。
  3. 创建BroadcastProcessFunction,在里面分别有两个函数
    • processBroadcastElement():处理广播流数据
    • processElement():处理主流数据
  4. 广播流处理逻辑:
    • 读取广播状态
    • 将配置表信息写到广播状态中
    • 根据广播状态数据的op对状态做相应的修改
  5. 主流处理逻辑:
    • 查询广播状态,判断当前数据对应的表是否存在于状态中
    • 如果数据比状态来的更早,造成状态为空,需要对状态做预处理(提前从mysql中读取维表配置表信息)
    • 如果根据当前表的表名查询的状态不为空,说明该表为维度数据,使用收集器收集起来。

筛选出需要的字段

Flink电商实时数仓(三),flink,linq,大数据
在维度配置信息表中的sink_column字段里定义了维度表需要的字段,使用filter算子对JsonObj里面的data字段进行过滤即可获取到想要的字段数据。

写出到Hbase

过滤后的数据流调用它的addSink方法,方法中需要传入一个SinkFunction接口类。该接口需要实现三个方法分别是:

  • open方法:获取HBase连接
  • close方法:关闭HBase连接
  • invoke方法:写入数据时调用的方法,根据jsonObj中的type做不同处理,如果是delete,需要删除对应的维度表数据;否则都是直接覆盖写入。

代码的Gitee仓库地址:https://gitee.com/langpaian/gmall2023-realtime.git文章来源地址https://www.toymoban.com/news/detail-772186.html

到了这里,关于Flink电商实时数仓(三)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink 实时数仓 (一) --------- 数据采集层

    1. 普通实时计算与实时数仓比较 普通的实时计算优先考虑时效性,所以从数据源采集经过实时计算直接得到结果。如此做时效性更好,但是弊端是由于计算过程中的中间结果没有沉淀下来,所以当面对大量实时需求的时候,计算的复用性较差,开发成本随着需求增加直线上升

    2024年02月06日
    浏览(46)
  • Flink+Doris 实时数仓

    Doris基本原理 Doris基本架构非常简单,只有FE(Frontend)、BE(Backend)两种角色,不依赖任何外部组件,对部署和运维非常友好。架构图如下 可以 看到Doris 的数仓架构十分简洁,不依赖 Hadoop 生态组件,构建及运维成本较低。 FE(Frontend)以 Java 语言为主,主要功能职责: 接收用户

    2024年02月07日
    浏览(47)
  • Flink CDC和Flink SQL构建实时数仓Flink写入Doris

    软件环境 Flink1.13.3 Scala 2.12 doris 0.14 一、MySQL 开启binlog日志、创建用户 1.开启bin log MySQL 8.0默认开启了binlog,可以通过代码show variables like \\\"%log_bin%\\\";查询是否开启了,show variables like \\\"%server_id%\\\";查询服务器ID。 上图分别显示了bin long是否开启以及bin log所在的位置。 2.创建用户 C

    2024年02月02日
    浏览(76)
  • 实时数仓|基于Flink1.11的SQL构建实时数仓探索实践

    实时数仓主要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的 OLAP 分析、实时的数据看板、业务指标实时监控等场景。虽然关于实时数仓的架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。本文会分享基于 Flink

    2024年02月16日
    浏览(42)
  • Flink实时数仓同步:拉链表实战详解

    在大数据领域,业务数据通常最初存储在关系型数据库,例如MySQL。然而,为了满足日常分析和报表等需求,大数据平台会采用多种不同的存储方式来容纳这些业务数据。这些存储方式包括离线仓库、实时仓库等,根据不同的业务需求和数据特性进行选择。 举例来说,假设业

    2024年01月20日
    浏览(48)
  • flink 实时数仓构建与开发[记录一些坑]

    1、业务库使用pg数据库, 业务数据可以改动任意时间段数据 2、监听采集业务库数据,实时捕捉业务库数据变更,同时实时变更目标表和报表数据 实时数据流图与分层设计说明 1、debezium采集pg库表数据同步到kafka 【kafka模式】 2、flink 消费kafka写入pg或kafka 【upset-kafka,新版k

    2024年02月16日
    浏览(37)
  • 美团买菜基于 Flink 的实时数仓建设

    美团买菜是美团自营生鲜零售平台,上面所有的商品都由美团亲自采购,并通过供应链物流体系,运输到距离用户 3km 范围内的服务站。用户从美团买菜平台下单后,商品会从服务站送到用户手中,最快 30 分钟内。 上图中,左侧的时间轴展示了美团买菜的发展历程,右侧展示

    2024年02月09日
    浏览(42)
  • GaussDB(DWS)基于Flink的实时数仓构建

    本文分享自华为云社区《GaussDB(DWS)基于Flink的实时数仓构建》,作者:胡辣汤。 大数据时代,厂商对实时数据分析的诉求越来越强烈,数据分析时效从T+1时效趋向于T+0时效,为了给客户提供极速分析查询能力,华为云数仓GaussDB(DWS)基于流处理框架Flink实现了实时数仓构建。在

    2024年04月22日
    浏览(39)
  • Flink实时数仓之用户埋点系统(一)

    用户行为采集 行为数据:页面浏览、点击、在线日志等数据 活跃数据:用户注册、卸载安装、活跃等数据 App性能日志:卡顿、异常等数据 业务数据采集 业务数据:支付等 维度表:渠道、商品等 用户行为日志 日志结构大致可分为两类,一是页面日志,二是启动日志和在线

    2024年04月11日
    浏览(42)
  • 曹操出行基于 Hologres+Flink 的实时数仓建设

    曹操出行 创立于2015年5月21日,是吉利控股集团布局“新能源汽车共享生态”的战略性投资业务,以“科技重塑绿色共享出行”为使命,将全球领先的互联网、车联网、自动驾驶技术以及新能源科技,创新应用于共享出行领域,以“用心服务国民出行”为品牌主张,致力于打

    2024年01月20日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包