Flink 系例之 SQL 案例 - 订单统计

这篇具有很好参考价值的文章主要介绍了Flink 系例之 SQL 案例 - 订单统计。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本期示例:

将对电商实时订单进行聚合计算,分以下两个维度进行:

1. 统计每 1 分钟的订单数量、用户数量、销售额、商品数

2. 统计每个分类的订单总数量、用户总数量(未去重)、销售额、商品数

流程说明:

flink数据统计,Flink,flink,sql,kafka,Powered by 金山文档

1. 通过模拟电商平台订单简要数据,向 Kafka 消息队列中推送 mq 数据;

2. 通过 flink 集群,建立 SQL 流批处理任务;

3.Kafka 做为 SQL 数据流的输入源,并建立 source 数据表;

4. 通过 SQL 建立查询视图,将聚合算子的结果通过视图输出;

5.Mysql 做为 SQL 数据流的输出源,并建立 sink 数据表;

6. 执行 insert 操作,并在 flink 集群中建立 Task Job 任务,将聚合结果视图输出到 sink 表;

操作过程

1. 前期准备工作

1. 搭建 Flink 集群,保持集群可用,并处于启动状态

2. 开启 flink-sql-client 客户端

3. 搭建 Kafka 消息队列中间件服务,并启动服务,保持 9092 端口可连接

4. 创建 mysql 数据库相关表

5. 上述服务搭建过程,省略...(或参见历史相关文章)

mysql 相关表,如下:

  • 每分钟订单统计表

CREATE TABLE`min_order_count` (
  `time_str`varchar(40) NOT NULL,
  `order_num`int(8) DEFAULT NULL,
  `sales_amount`decimal(12,2) DEFAULT NULL,
  `user_num`int(8) DEFAULT NULL,
  `count_num`int(8) DEFAULT NULL,
  PRIMARY KEY (`time_str`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
  • 分类订单统计表

CREATE TABLE`type_order_count` (
  `goods_type`varchar(40) NOT NULL,
  `order_num`int(8) DEFAULT NULL,
  `sales_amount`decimal(12,2) DEFAULT NULL,
  `user_num`int(8) DEFAULT NULL,
  `count_num`int(8) DEFAULT NULL,
  PRIMARY KEY (`goods_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

参见:flink 启动与退出命令

# 只需启动master主机即可,会自动发送指令启动slaves
./bin/start-cluster.sh 
# 停止集群
./bin/stop-cluster.sh
# 启动sql客户端(依赖集群)
./bin/sql-client.sh embedded
# 退出窗口
quit;

订单数据结构

{
"orderId": "202103201105397126154",
"userName": "褚贰",
"goodsType": "母婴",
"orderTime": "2021-03-20 11:05:39",
"orderTimeSeries": 1616209539712,
"price": 118.82,
"num": 3,
"totalPrice": 356.46
}

2. 统计每 1 分钟的订单数量

2.1 订单数据 source 表

1. 在 flinkSQL 窗口,建立 source 表,并通过连接器从 kafka 中获取数据流;

-- 创建订单来源表(获取kafka数据流)
CREATE TABLE order_source (
orderId STRING,
userName STRING,
goodsType STRING,
price DECIMAL(12, 2),
num INT,
orderTime STRING,
orderTimeSeries BIGINT,
totalPrice DECIMAL(12, 2),
ts AS TO_TIMESTAMP(FROM_UNIXTIME(orderTimeSeries / 1000, 'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR ts as ts - INTERVAL'5'SECOND 
) WITH (
'connector' = 'kafka',
'topic' = 'order_behavior',
'properties.bootstrap.servers' = '192.168.1.1:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

-- 查看一张表或者视图的 Schema。
DESCRIBE order_source;

示例解读:

-- 表示将orderTimeSeries时间戳转换为ts时间格式
ts AS TO_TIMESTAMP(FROM_UNIXTIME(orderTimeSeries / 1000, 'yyyy-MM-dd HH:mm:ss'))
-- 设置水印,表示按ts时间定义水印位,允许5秒延迟,防止数据并没有严格按时间顺序流入后,对窗口内数据进行再计算
WATERMARK FOR ts as ts - INTERVAL '5' SECOND 
-- 通过计算列产生一个处理时间列(PROCTIME()内置时间函数)
proctime as PROCTIME(),   
-- scan.startup.mode 指offset的消费模式,有五种模式:
earliest-offset表示从topic中最初的数据开始消费
latest-offset表示从topic中最新的数据开始消费
group-offsets表示从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
timestamp表示从topic中指定的时间点开始消费,指定时间点之前的数据忽略,需结合'scan.startup.timestamp-millis'配置一起使用
specific-offsets表示从topic中指定的offset开始,这个比较复杂,需要手动指定offset,结合'scan.startup.specific-offsets'配置一起使用

查询 SQL:

-- 查询结果
select orderId,userName,goodsType,orderTime,orderTimeSeries,price,num,totalPrice,ts from order_source;
-- 统计每种订单类型的总量
select goodsType,count(orderId) as orderCount from order_source group by goodsType;
-- 统计每种订单类型的总价
select goodsType,sum(totalPrice) as totalPrice from order_source group by goodsType;
-- 基于时间字段排序
select orderId,userName,goodsType,totalPrice from order_source order by ts;
-- 查询时间排序的前3条记录。注意:LIMIT 查询需要有一个 ORDER BY 。
select orderId,userName,goodsType,totalPrice from order_source order by ts limit 3;

2.2 建立订单数据 sink 表

1. 订单聚合计算后,需要将结果存入到数据库,通过连接器建立 mysql 数据流输出表(mysql 表需要提前建立)

-- 创建按分钟统计表 
CREATE TABLE min_order_count_sink (
time_str STRING,
order_num INT,
user_num INT,
sales_amount DECIMAL(12, 2),
count_num INT,
PRIMARY KEY (time_str) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.1:3306/flink?useUnicode=true&characterEncoding=utf-8', 
'driver' = 'com.mysql.jdbc.Driver', 
'table-name' = 'min_order_count', 
'username' = 'root',
'password' = 'root' 
);
-- 查看一张表或者视图的 Schema。
DESCRIBE min_order_count_sink ;

2.3 创建查询视图

-- 创建按每1分钟统计订单数的视图
CREATE VIEW count_min_order_view AS 
SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL'1'MINUTE), 'HH:mm') as window_start,
DATE_FORMAT(TUMBLE_END(ts, INTERVAL'1'MINUTE), 'HH:mm') as window_end,
COUNT(orderId) as order_num,
COUNT(userName) as user_num,
SUM(totalPrice) as sales_amount,
SUM(num) as count_num 
FROM order_source 
GROUP BY TUMBLE(ts, INTERVAL'1'MINUTE);
-- 讲解
TUMBLE_START(ts, INTERVAL '1' MINUTE):返回TIMESTAMP类型;返回窗口的起始时间(包含边界)。例如[00:10,00:15)窗口,返回00:10。
TUMBLE_END(ts, INTERVAL '1' MINUTE):返回TIMESTAMP类型;返回窗口的结束时间(包含边界)。例如[00:00, 00:15]窗口,返回00:15。
GROUPBYTUMBLE(ts, INTERVAL '1' MINUTE), goodsType:组合分组统计字段:ts时间属性(1分钟内所有数据)和goodsTypeTUMBLE函数:用在GROUPBY子句中,用来定义滚动窗口。

2.4 建立 JOB 任务

在 SQL 窗口执行以下 insert 后,并会在 flink 平台建立 job 任务,JOB 任务运行后,按窗口规则将统计结果从视图中插入到 sink 数据输出表。

-- 将视图数据插入到按每分统计的维度表
insert into min_order_count_sink 
select window_end as time_str,
cast(order_num asINT),
cast(user_num asINT) 
,cast(sales_amount asDECIMAL)
,cast(count_num asINT)
from count_min_order_view;

2.5 按类型统计订单信息

创建 Mysql 输出表

-- 创建按类型统计订单信息
CREATE TABLE type_order_count_sink (
goods_type STRING,
order_num INT,
user_num INT,
sales_amount DECIMAL(12, 2),
count_num INT,
PRIMARY KEY (goods_type) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.1:3306/flink?useUnicode=true&characterEncoding=utf-8', 
'driver' = 'com.mysql.jdbc.Driver', 
'table-name' = 'type_order_count', 
'username' = 'root',
'password' = 'root' 
);
-- 查看一张表或者视图的 Schema。
DESCRIBE type_order_count_sink ;

基于时间窗口的聚合统计查询

示例 1

-- 使用Event Time统计每个分类每分钟的订单数示例
SELECT
TUMBLE_START(ts, INTERVAL'1'MINUTE) as window_start,
TUMBLE_END(ts, INTERVAL'1'MINUTE) as window_end,
goodsType,
COUNT(orderId)
FROM order_source
GROUP BY TUMBLE(ts, INTERVAL'1'MINUTE), goodsType;
-- 讲解
TUMBLE_START(ts, INTERVAL '1' MINUTE):返回TIMESTAMP类型;返回窗口的起始时间(包含边界)。例如[00:10,00:15)窗口,返回00:10。
TUMBLE_END(ts, INTERVAL '1' MINUTE)	:返回TIMESTAMP类型;返回窗口的结束时间(包含边界)。例如[00:00, 00:15]窗口,返回00:15。
GROUPBYTUMBLE(ts, INTERVAL '1' MINUTE), goodsType:组合分组统计字段:ts时间属性(1分钟内所有数据)和goodsTypeTUMBLE函数:用在GROUPBY子句中,用来定义滚动窗口。

示例 2

-- 统计每个分类过去1分钟的订单数,每30秒更新1次,即1分钟的窗口,30秒滑动1次。
SELECT
HOP_START (ts, INTERVAL'30'SECOND, INTERVAL'1'MINUTE),
HOP_END (ts, INTERVAL'30'SECOND, INTERVAL'1'MINUTE),
goodsType,
COUNT (orderId)
FROM order_source
GROUP BY HOP (ts, INTERVAL'30'SECOND, INTERVAL'1'MINUTE), goodsType;
-- 讲解
HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE):返回TIMESTAMP类型;返回窗口的起始时间(包含边界)。例如[00:10, 00:15) 窗口,返回00:10 。
HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE):返回TIMESTAMP类型;	返回窗口的结束时间(包含边界)。例如[00:00, 00:15) 窗口,返回00:15。
GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), goodsType: 组合分组统计字段:ts时间属性(每次加载30秒,但统计1分钟内所有数据)和goodsType
HOP函数: 用在groupby子句中,用来定义滑动窗口。

示例 3

SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL'1'MINUTE), 'HH:mm') as window_start,
DATE_FORMAT(TUMBLE_END(ts, INTERVAL'1'MINUTE), 'HH:mm') as window_end,
COUNT(orderId)
FROM order_source
GROUP BY TUMBLE(ts, INTERVAL'1'MINUTE);
-- 讲解
DATE_FORMAT:对时间格式化,如:
DATE_FORMAT(TUMBLE_ROWTIME(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS')

创建统计结果视图

-- 创建按每1分钟统计分类订单数的视图
CREATE VIEW count_type_order_view AS 
SELECT
goodsType as goods_type,
COUNT(orderId) as order_num,
COUNT(userName) as user_num,
SUM(totalPrice) as sales_amount,
SUM(num) as count_num 
FROM order_source
GROUP BY TUMBLE(ts, INTERVAL'1'MINUTE), goodsType;
-- 讲解
GROUPBYTUMBLE(ts, INTERVAL '1' MINUTE), goodsType:组合分组统计字段:ts时间属性(1分钟内所有数据)和goodsTypeTUMBLE函数:用在GROUPBY子句中,用来定义滚动窗口。

2.6 建立 JOB 任务

同样在 SQL 窗口执行以下 insert 后,并会在 flink 平台建立 job 任务,JOB 任务运行后,按窗口规则将统计结果从视图中插入到 sink 数据输出表。

-- 将视图数据插入到按每分统计的维度表 
insert into type_order_count_sink 
select goods_type,
cast(order_num asINT),
cast(user_num asINT),
cast(sales_amount asDECIMAL(12, 2)),
cast(count_num asINT)
from count_type_order_view;

2.7 常用 SHOW 命令

-- 目前 Flink SQL 支持下列 SHOW 语句:
SHOW CATALOGS
SHOW DATABASES 
SHOW TABLES 
SHOW VIEWS 
SHOW FUNCTIONS

结果展示

提交任务

flink数据统计,Flink,flink,sql,kafka,Powered by 金山文档

flink 集群运行 JOB 作业

flink数据统计,Flink,flink,sql,kafka,Powered by 金山文档

查询 Mysql 统计表

查询分类表输出的统计结果

SELECT * FROM`type_order_count`
flink数据统计,Flink,flink,sql,kafka,Powered by 金山文档

查询分钟表输出的统计结果

SELECT * FROM`min_order_count` ORDER BY time_str DESC
flink数据统计,Flink,flink,sql,kafka,Powered by 金山文档

问题处理

问题 1

[ERROR] Could not execute SQL statement. Reason: java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-null long value. Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.

处理:缺少包,补充:lib/flink-table-common-1.11.1.jar

问题 2

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

处理:缺少包,补充:lib/flink-connector-jdbc_2.12-1.11.1.jar

问题 3

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.

处理:流处理结果需要根据 时间属性 按照升序进行排序。

问题 4

-- 在 ts 上定义 watermark,ts 成为事件时间列 Caused by: java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-null long value.

处理:ts 时间类型不能为空,时间需要做格式转换,如:ts AS TO_TIMESTAMP (FROM_UNIXTIME (orderTimeSeries / 1000, 'yyyy-MM-dd HH:mm:ss'))

问题 5

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.order_sink do not match. Query schema: [goods_type: VARCHAR(2147483647), count_num: BIGINT NOT NULL] Sink schema: [id: BIGINT, goods_type: VARCHAR(2147483647), count_num: INT]

处理:count_num 查询结果类型为 BIGINT,而 sink 的输出表字段类型为 INT, 需强转

insert into order_sink select goods_type,cast(count_num as INT) as count_num from count_order_type_view;

问题 6

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.order_sink do not match. Query schema: [goods_type: VARCHAR(2147483647), count_num: INT NOT NULL] Sink schema: [id: BIGINT, goods_type: VARCHAR(2147483647), count_num: INT]

处理:注意:此处已是对 count_num 做了转换后的错误提示,从字段分析来看是 ID 缺少输入值,可以在用 0 来表示 id 的自增主键值(select 0 from table),为 0 则 mysql 识别会默认自增值代替;

问题 7

[ERROR] Could not execute SQL statement. Reason:java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

处理:当查询包含更新 / 删除记录时,请声明接收器表的主键。

问题 8

JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Caused by: java.io.IOException: Cannot connect to the client to send back the stream Caused by: java.net.ConnectException: Connection refused

处理:如果在 flink WEB UI 上停止 job,则有可能会导制 kafka 和 mysql 拒绝连接,netstat -apn|grep 9092 查看端口,大量 TIME_WAIT 状态;因没有设置 job 的重启策略,则需要重启 flink 服务;

问题 9

关于 flinkSQL 中的并行度设置

处理:目前 flink 1.11 sql 是不支持 source/sink 并行度配置的,flink sql 中各算子并行度默认是根据 source 的 partition 数或文件数来决定的;文章来源地址https://www.toymoban.com/news/detail-697656.html

到了这里,关于Flink 系例之 SQL 案例 - 订单统计的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • XL-LightHouse 与 Flink 和 ClickHouse 流式大数据统计系统

    一个Flink任务只能并行处理一个或少数几个数据流,而XL-LightHouse一个任务可以并行处理数万个、几十万个数据流; 一个Flink任务只能实现一个或少数几个数据指标,而XL-LightHouse单个任务就能支撑大批量、数以万计的数据指标。 1、XL-LightHouse :  1、再也不需要用 Flink、Spark、

    2024年02月09日
    浏览(26)
  • Flink 系例 之 Connectors 连接 ElasticSearch

    通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作; 示例环境 示例数据源 (项目码云下载) Flink 系例 之 搭建开发环境与数据 示例模块 (pom.xml) Flink 系例 之 DataStream Connectors 与 示例模块 数据流输入

    2024年02月16日
    浏览(37)
  • Flink 系例 之 Dashboard 安装与界面操作

    前期入门讲解了需多常用算子、方法、和连接器的使用与代码示例,本文讲解如何安装 Apache Flink Dashboard 平台与界面基本操作。 Apache Flink Dashboard 是 Flink 的客户端管理控制平台,也是 JOB 运行时任务跟踪与管理的显示平台。撑握基本的应用是学习 Flink 必备的,通过管理控制台

    2024年02月05日
    浏览(29)
  • Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

    Flink TTL(Time To Live)是一种机制,用于设置数据的过期时间,控制数据在内存或状态中的存活时间。通过设置TTL,可以自动删除过期的数据,从而释放资源并提高性能。 在Flink中,TTL可以应用于不同的组件和场景,包括窗口、状态和表。 窗口:对于窗口操作,可以将TTL应用于

    2024年02月03日
    浏览(46)
  • 【Java 动态数据统计图】动态数据统计思路案例(动态,排序,数组)一(112)

    需求: : 有一个ListMapString.Object,存储了某年某月的数据, 数据是根据用户查询条件进行显示的;所以查询的数据是动态的; 需按月份统计每个年月数据出现的次数,并且按照月份排序; 统计周期不超过一年(前端页面控制); 分析: : 后端给前端返回值:不能用对象返

    2024年02月12日
    浏览(36)
  • 【Java 动态数据统计图】动态数据统计思路案例(动态,排序,数组)四(116)

    需求: :前端根据后端的返回数据:画统计图; 1.动态获取地域数据以及数据中的平均值,按照平均值降序排序; 说明: X轴是动态的,有对应区域数据则展示; X轴 区域数据降序排序; 代码: Postman接口测试: 总结: 代码中有一个地域的数组:reginArr ,如果地域字典新增

    2024年02月12日
    浏览(38)
  • 【Java 动态数据统计图】动态数据统计思路案例(动态,排序,动态数组(重点推荐))七(129)

    需求 :前端根据后端的返回数据:画统计图; 说明: 1.X轴为地域,Y轴为地域出现的次数; 2. 动态展示(有地域展示,没有不展示,且高低排序) Demo案例 : 测试输出 : 案例二 : postman接口测试 :

    2024年02月10日
    浏览(35)
  • Apache ECharts-数据统计(详解、入门案例)

    简介:Apache ECharts 是一款基于 Javascript 的数据可视化图表库,提供直观,生动,可交互,可个性化定制的数据可视化图表。 图 1.1  Apache ECharts 功能、运行环境 功能:         ECharts,一个使用 JavaScript 实现的开源可视化库,可以流畅的运行在 PC 和移动设备上,兼容当前

    2024年04月13日
    浏览(34)
  • 编写sql统计一段时间内,每天、每月、每年的统计数据(PostgreSQL)

    前言 在做数据统计页面时,总会遇到统计某段时间内,每天、每月、每年的数据视图(柱状图、折线图等)。这些统计数据一眼看过去也简单呀,不就是按照时间周期(天、月、年)对统计数据进行分个组就完了嘛?但是会有一个问题,简单的写个sql对周期分组,获取到的统

    2024年02月12日
    浏览(28)
  • 数据处理与统计分析——MySQL与SQL

    数据库:DB(DataBase) 概念 :数据仓库,软件,安装在操作系统之上 作用 :存储数据,管理数据 关系型数据库:SQL(Structured Query Language) MySQL、Oracle、Sql Server、DB2、SQLlite 通过表和表之间,行和列之间的关系进行数据的存储 通过外键关联来建立表与表之间的关系 非关系型

    2024年02月12日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包