实时数仓|基于Flink1.11的SQL构建实时数仓探索实践

这篇具有很好参考价值的文章主要介绍了实时数仓|基于Flink1.11的SQL构建实时数仓探索实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

实时数仓主要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析、实时的数据看板、业务指标实时监控等场景。虽然关于实时数仓的架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。本文会分享基于Flink SQL01搭建一个实时数仓的demo,涉及数据采集、存储、计算、可视化整个处理流程。通过本文你可以了解到:

  • 实时数仓的基本架构
  • 实时数仓的数据处理流程
  • Flink1.11SQL新特性
  • Flink1.11存在的bug
  • 完整的操作案例

古人学问无遗力,少壮工夫老始成。

纸上得来终觉浅,绝知此事要躬行。

案例简介

本文会以电商业务为例,展示实时数仓的数据处理流程。另外,本文旨在说明实时数仓的构建流程,所以不会涉及太复杂的数据计算。为了保证案例的可操作性和完整性,本文会给出详细的操作步骤。为了方便演示,本文的所有操作都是在Flink SQL Cli中完成的。

架构设计

具体的架构设计如图所示:首先通过canal解析MySQLbinlog日志,将数据存储在Kafka中。然后使用Flink SQL对原始数据进行清洗关联,并将处理之后的明细宽表写入kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行JOIN,将聚合后的数据写入MySQL,最后通过FineBI进行可视化展示。

业务数据准备

  • 订单表(order_info

CREATE TABLE `order_info` (
  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '编号',
  `consignee` varchar(100DEFAULT NULL COMMENT '收货人',
  `consignee_tel` varchar(20DEFAULT NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2DEFAULT NULL COMMENT '总金额',
  `order_status` varchar(20DEFAULT NULL COMMENT '订单状态',
  `user_id` bigint(20DEFAULT NULL COMMENT '用户id',
  `payment_way` varchar(20DEFAULT NULL COMMENT '付款方式',
  `delivery_address` varchar(1000DEFAULT NULL COMMENT '送货地址',
  `order_comment` varchar(200DEFAULT NULL COMMENT '订单备注',
  `out_trade_no` varchar(50DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` varchar(200DEFAULT NULL COMMENT '订单描述(第三方支付用)',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `operate_time` datetime DEFAULT NULL COMMENT '操作时间',
  `expire_time` datetime DEFAULT NULL COMMENT '失效时间',
  `tracking_no` varchar(100DEFAULT NULL COMMENT '物流单编号',
  `parent_order_id` bigint(20DEFAULT NULL COMMENT '父订单编号',
  `img_url` varchar(200DEFAULT NULL COMMENT '图片路径',
  `province_id` int(20DEFAULT NULL COMMENT '地区',
  PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';

  • 订单详情表(order_detail

CREATE TABLE `order_detail` (
  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '编号',
  `order_id` bigint(20DEFAULT NULL COMMENT '订单编号',
  `sku_id` bigint(20DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200DEFAULT NULL COMMENT 'sku名称(冗余)',
  `img_url` varchar(200DEFAULT NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` varchar(200DEFAULT NULL COMMENT '购买个数',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';

  • 商品表(sku_info

CREATE TABLE `sku_info` (
  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
  `spu_id` bigint(20DEFAULT NULL COMMENT 'spuid',
  `price` decimal(10,0DEFAULT NULL COMMENT '价格',
  `sku_name` varchar(200DEFAULT NULL COMMENT 'sku名称',
  `sku_desc` varchar(2000DEFAULT NULL COMMENT '商品规格描述',
  `weight` decimal(10,2DEFAULT NULL COMMENT '重量',
  `tm_id` bigint(20DEFAULT NULL COMMENT '品牌(冗余)',
  `category3_id` bigint(20DEFAULT NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` varchar(200DEFAULT NULL COMMENT '默认显示图片(冗余)',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';

  • 商品一级类目表(base_category1

CREATE TABLE `base_category1` (
  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(10NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';

  • 商品二级类目表(base_category2

CREATE TABLE `base_category2` (
  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(200NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint(20DEFAULT NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';

  • 商品三级类目表(base_category3

CREATE TABLE `base_category3` (
  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(200NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint(20DEFAULT NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';

  • 省份表(base_province

CREATE TABLE `base_province` (
  `id` int(20DEFAULT NULL COMMENT 'id',
  `name` varchar(20DEFAULT NULL COMMENT '省名称',
  `region_id` int(20DEFAULT NULL COMMENT '大区id',
  `area_code` varchar(20DEFAULT NULL COMMENT '行政区位码'
ENGINE=InnoDB DEFAULT CHARSET=utf8;

  • 区域表(base_region

CREATE TABLE `base_region` (
  `id` int(20NOT NULL COMMENT '大区id',
  `region_name` varchar(20DEFAULT NULL COMMENT '大区名称',
   PRIMARY KEY (`id`)
ENGINE=InnoDB DEFAULT CHARSET=utf8;

注意:以上的建表语句是在MySQL中完成的,完整的建表及模拟数据生成脚本见:

链接:https://pan.baidu.com/s/1fcMgDHGKedOpzqLbSRUGwA 提取码:zuqw

数据处理流程

ODS层数据同步

关于ODS层的数据同步参见我的另一篇文章基于CanalFlink实现数据实时增量同步()。主要使用canal解析MySQLbinlog日志,然后将其写入到Kafka对应的topic中。由于篇幅限制,不会对具体的细节进行说明。同步之后的结果如下图所示:

DIM层维表数据准备

本案例中将维表存储在了MySQL中,实际生产中会用HBase存储维表数据。我们主要用到两张维表:区域维表商品维表。处理过程如下:

  • 区域维表

首先将

mydw.base_province

mydw.base_region

这个主题对应的数据抽取到MySQL中,主要使用Flink SQLKafka数据源对应的canal-json格式,注意:在执行装载之前,需要先在MySQL中创建对应的表,本文使用的MySQL数据库的名字为dim,用于存放维表数据。如下:

-- -------------------------
--   省份
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_province`;
CREATE TABLE `ods_base_province` (
  `id` INT,
  `name` STRING,
  `region_id` INT ,
  `area_code`STRING
WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_province',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   省份
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
    `id` INT,
    `name` STRING,
    `region_id` INT ,
    `area_code`STRING,
    PRIMARY KEY (idNOT ENFORCED
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_province'-- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--   省份
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_province
SELECT *
FROM ods_base_province;

-- -------------------------
--   区域
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_region`;
CREATE TABLE `ods_base_region` (
  `id` INT,
  `region_name` STRING
WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   区域
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_region`;
CREATE TABLE `base_region` (
    `id` INT,
    `region_name` STRING,
     PRIMARY KEY (idNOT ENFORCED
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_region'-- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--   区域
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_region
SELECT *
FROM ods_base_region;

经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:

dim_province

作为维表:

-- ---------------------------------
-- DIM,区域维表,
-- MySQL中创建视图
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  br.id AS region_id,
  br.region_name AS region_name,
  bp.area_code AS area_code
FROM base_region br 
     JOIN base_province bp ON br.id= bp.region_id
;

这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:

-- -------------------------
--  一级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category1`;
CREATE TABLE `ods_base_category1` (
  `id` BIGINT,
  `name` STRING
)WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.base_category1',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ;

-- -------------------------
--  一级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category1`;
CREATE TABLE `base_category1` (
    `id` BIGINT,
    `name` STRING,
     PRIMARY KEY (idNOT ENFORCED
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category1'-- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--  一级类目表
--   MySQL Sink Load Data
-- ------------------------- 

INSERT INTO base_category1
SELECT *
FROM ods_base_category1;

-- -------------------------
--  二级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category2`;
CREATE TABLE `ods_base_category2` (
  `id` BIGINT,
  `name` STRING,
  `category1_id` BIGINT
)WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_category2',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ;

-- -------------------------
--  二级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category2`;
CREATE TABLE `base_category2` (
    `id` BIGINT,
    `name` STRING,
    `category1_id` BIGINT,
     PRIMARY KEY (idNOT ENFORCED
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category2'-- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--  二级类目表
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category2
SELECT *
FROM ods_base_category2;

-- -------------------------
-- 三级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category3`;
CREATE TABLE `ods_base_category3` (
  `id` BIGINT,
  `name` STRING,
  `category2_id` BIGINT
)WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_category3',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--  三级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category3`;
CREATE TABLE `base_category3` (
    `id` BIGINT,
    `name` STRING,
    `category2_id` BIGINT,
    PRIMARY KEY (idNOT ENFORCED
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category3'-- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--  三级类目表
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category3
SELECT *
FROM ods_base_category3;

-- -------------------------
--   商品表
--   Kafka Source
-- ------------------------- 

DROP TABLE IF EXISTS `ods_sku_info`;
CREATE TABLE `ods_sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10,0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10,2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0)
WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.sku_info',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   商品表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `sku_info`;
CREATE TABLE `sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10,0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10,2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0),
   PRIMARY KEY (tm_id) NOT ENFORCED
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'sku_info'-- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--   商品
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO sku_info
SELECT *
FROM ods_sku_info;

经过上面的步骤,我们可以将创建商品维表的基础数据表同步到MySQL中,同样需要提前创建好对应的数据表。接下来我们使用上面的基础表在mySQLdim库中创建一张视图:

dim_sku_info

,用作后续使用的维表。

-- ---------------------------------
-- DIM,商品维表,
-- MySQL中创建视图
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
(
  sku_info si 
  JOIN base_category3 c3 ON si.category3_id = c3.id
  JOIN base_category2 c2 ON c3.category2_id =c2.id
  JOIN base_category1 c1 ON c2.category1_id = c1.id
);

至此,我们所需要的维表数据已经准备好了,接下来开始处理DWD层的数据。

DWD层数据处理

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。具体过程如下:

-- -------------------------
--   订单详情
--   Kafka Source
-- ------------------------- 

DROP TABLE IF EXISTS `ods_order_detail`;
CREATE TABLE `ods_order_detail`(
  `id` BIGINT,
  `order_id` BIGINT,
  `sku_id` BIGINT,
  `sku_name` STRING,
  `img_url` STRING,
  `order_price` DECIMAL(10,2),
  `sku_num` INT,
  `create_time` TIMESTAMP(0)
WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.order_detail',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   订单信息
--   Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_info`;
CREATE TABLE `ods_order_info` (
  `id` BIGINT,
  `consignee` STRING,
  `consignee_tel` STRING,
  `total_amount` DECIMAL(10,2),
  `order_status` STRING,
  `user_id` BIGINT,
  `payment_way` STRING,
  `delivery_address` STRING,
  `order_comment` STRING,
  `out_trade_no` STRING,
  `trade_body` STRING,
  `create_time` TIMESTAMP(0) ,
  `operate_time` TIMESTAMP(0) ,
  `expire_time` TIMESTAMP(0) ,
  `tracking_no` STRING,
  `parent_order_id` BIGINT,
  `img_url` STRING,
  `province_id` INT
WITH(
'connector' = 'kafka',
 'topic' = 'mydw.order_info',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- ---------------------------------
-- DWD,支付订单明细表dwd_paid_order_detail
-- ---------------------------------
DROP TABLE IF EXISTS dwd_paid_order_detail;
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,0),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- DWD,已支付订单明细表
-- dwd_paid_order_detail装载数据
-- ---------------------------------
INSERT INTO dwd_paid_order_detail
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info
    WHERE order_status = '2' -- 已支付
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail
    ) od 
    ON oi.id = od.order_id;

ADS层数据

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了Kafka中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

  • ads_province_index

首先在MySQL中创建对应的ADS目标表:ads_province_index

CREATE TABLE ads.ads_province_index(
  province_id INT(10),
  area_code VARCHAR(100),
  province_name VARCHAR(100),
  region_id INT(10),
  region_name VARCHAR(100),
  order_amount DECIMAL(10,2),
  order_count BIGINT(10),
  dt VARCHAR(100),
  PRIMARY KEY (province_id, dt) 
) ;

MySQLADS层目标装载数据:

-- Flink SQL Cli操作
-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
--      2.每天每个省份的订单金额
-- ---------------------------------
CREATE TABLE ads_province_index(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/ads',
    'table-name' = 'ads_province_index'
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------
CREATE TABLE tmp_province_index(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE
)WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_province_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index
SELECT
      province_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_province_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_province_index_source(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_province_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

-- ---------------------------------
-- DIM,区域维表,
-- 创建区域维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_province`;
CREATE TABLE dim_province (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'dim_province'
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'scan.fetch-size' = '100'
);

-- ---------------------------------
-- ads_province_index装载数据
-- 维表JOIN
-- ---------------------------------

INSERT INTO ads_province_index
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_source pc
  JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp 
  ON dp.province_id = pc.province_id;

当提交任务之后:观察Flink WEB UI

查看ADS层的ads_province_index表数据:

  • ads_sku_index

首先在MySQL中创建对应的ADS目标表:ads_sku_index

CREATE TABLE ads_sku_index
(
  sku_id BIGINT(10),
  sku_name VARCHAR(100),
  weight DOUBLE,
  tm_id BIGINT(10),
  price DOUBLE,
  spu_id BIGINT(10),
  c3_id BIGINT(10),
  c3_name VARCHAR(100) ,
  c2_id BIGINT(10),
  c2_name VARCHAR(100),
  c1_id BIGINT(10),
  c1_name VARCHAR(100),
  order_amount DOUBLE,
  order_count BIGINT(10),
  sku_count BIGINT(10),
  dt varchar(100),
  PRIMARY KEY (sku_id,dt)
);

MySQLADS层目标装载数据:

-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/ads',
    'table-name' = 'ads_sku_index'
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe'
);

-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
CREATE TABLE tmp_sku_index(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
 order_sku_num BIGINT,
    pay_date DATE
)WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
      sku_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
   sum(sku_num) order_sku_num,
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;

-- ---------------------------------
-- tmp_sku_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM,商品维表,
-- 创建商品维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
  id BIGINT,
  sku_name STRING,
  c3_id BIGINT,
  weight DECIMAL(10,2),
  tm_id BIGINT,
  price DECIMAL(10,2),
  spu_id BIGINT,
  c3_name STRING,
  c2_id BIGINT,
  c2_name STRING,
  c1_id BIGINT,
  c1_name STRING,
  PRIMARY KEY (idNOT ENFORCED
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'dim_sku_info'
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'scan.fetch-size' = '100'
);
-- ---------------------------------
-- ads_sku_index装载数据
-- 维表JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc 
  JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
  ON ds.id = sc.sku_id
  ;

当提交任务之后:观察Flink WEB UI

查看ADS层的ads_sku_index表数据:

FineBI结果展示

其他注意点

Flink1.11.0存在的bug

当在代码中使用Flink1.11.0版本时,如果将一个change-log的数据源insert到一个upsert sink时,会报如下异常:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. 
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])

bug目前已被修复,修复可以在Flink1.11.1中使用。文章来源地址https://www.toymoban.com/news/detail-605232.html

到了这里,关于实时数仓|基于Flink1.11的SQL构建实时数仓探索实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于 Flink 构建实时数据湖的实践

    本文整理自火山引擎云原生计算研发工程师王正和闵中元在本次 CommunityOverCode Asia 2023 数据湖专场中的《基于 Flink 构建实时数据湖的实践》主题演讲。 实时数据湖是现代数据架构的核心组成部分,随着数据湖技术的发展,用户对其也有了更高的需求:需要从多种数据源中导入

    2024年02月04日
    浏览(33)
  • flink 实时数仓构建与开发[记录一些坑]

    1、业务库使用pg数据库, 业务数据可以改动任意时间段数据 2、监听采集业务库数据,实时捕捉业务库数据变更,同时实时变更目标表和报表数据 实时数据流图与分层设计说明 1、debezium采集pg库表数据同步到kafka 【kafka模式】 2、flink 消费kafka写入pg或kafka 【upset-kafka,新版k

    2024年02月16日
    浏览(28)
  • NineData:通过一个SQL语句构建实时数仓

    随着企业数据量呈现出爆炸式增长,跨部门、跨应用、跨平台的数据交互需求越来越频繁,传统的数据查询方式已经难以满足这些需求。同时,不同数据库系统之间的数据格式、查询语言等都存在差异,直接进行跨库查询十分困难。 虽然 MySQL、Oracle、PostgreSQL 等数据库系统都

    2024年02月05日
    浏览(38)
  • Apache Flink X Apache Doris构建极速易用的实时数仓架构

    大家好,我叫王磊。是SelectDB 大数据研发。今天给大家带来的分享是《Apache Flink X Apache Doris构建极速易用的实时数仓架构》。 下面是我们的个人介绍:我是Apache Doris Contributor 和阿里云 MVP。同时著有《 图解 Spark 大数据快速分析实战》等书籍。 接下来咱们进入本次演讲的正题

    2023年04月24日
    浏览(31)
  • 美团买菜基于 Flink 的实时数仓建设

    美团买菜是美团自营生鲜零售平台,上面所有的商品都由美团亲自采购,并通过供应链物流体系,运输到距离用户 3km 范围内的服务站。用户从美团买菜平台下单后,商品会从服务站送到用户手中,最快 30 分钟内。 上图中,左侧的时间轴展示了美团买菜的发展历程,右侧展示

    2024年02月09日
    浏览(35)
  • 实时数据湖 Flink Hudi 实践探索

    导读: 首先做个自我介绍,我目前在阿里云云计算平台,从事研究 Flink 和 Hudi 结合方向的相关工作。 目前,Flink + Hudi 的方案推广大概已经有了一年半的时间,在国内流行度也已比较高,主流的公司也会尝试去迭代他们的数仓方案。所以,今天我介绍的主题是 Flink 和 Hudi 在

    2024年01月16日
    浏览(43)
  • 曹操出行基于 Hologres+Flink 的实时数仓建设

    曹操出行 创立于2015年5月21日,是吉利控股集团布局“新能源汽车共享生态”的战略性投资业务,以“科技重塑绿色共享出行”为使命,将全球领先的互联网、车联网、自动驾驶技术以及新能源科技,创新应用于共享出行领域,以“用心服务国民出行”为品牌主张,致力于打

    2024年01月20日
    浏览(35)
  • 基于 Hologres+Flink 的曹操出行实时数仓建设

    本文整理自曹操出行实时计算负责人林震基于 Hologres+Flink 的曹操出行实时数仓建设的分享,内容主要分为以下六部分: 曹操出行业务背景介绍 曹操出行业务痛点分析 Hologres+Flink 构建企业级实时数仓 曹操出行实时数仓实践 曹操出行业务成果分析 未来展望 曹操出行 创立于

    2024年01月19日
    浏览(50)
  • 基于 HBase & Phoenix 构建实时数仓(3)—— Phoenix 安装

    目录 一、主机规划 二、Phoenix 安装 1. 解压、配置环境 2. 复制两个文件 3. 重启 HBase 集群 4. 安装验证 (1)连接 HBase (2)视图映射 (3)表映射 参考:         继续上一篇,本篇介绍在同一环境中安装 Phoenix,并连接上篇部署的 HBase 集群。         所需安装包:Phoenix-5.1.

    2024年04月10日
    浏览(26)
  • 基于 Flink 的实时数仓在曹操出行运营中的应用

    本文整理自曹操出行基础研发部负责人史何富,在 Flink Forward Asia 2023 主会场的分享。本次分享将为大家介绍实时数仓在曹操出行(互联网网约车出行企业)的实时数仓应用场景,以及通过离线场景向实时场景下加速升级而获得的业务价值。内容主要分为以下六部分: 业务简

    2024年01月20日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包