FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析

这篇具有很好参考价值的文章主要介绍了FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

FlinkSQL处理如下实时数据需求:
实时聚合不同 类型/账号/发布时间 的各个指标数据,比如:初始化/初始化后删除/初始化后取消/推送/成功/失败 的指标数据。要求实时产出指标数据,数据源是mysql cdc binlog数据。

代码实例

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

CREATE TABLE kafka_table (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
     publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
     msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
     send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])
     --event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
     --WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't1',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
   --  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交
  'format' = 'json'
);



CREATE TABLE es_sink(
     send_type      STRING
    ,account_id     STRING
    ,publish_time   STRING
    ,grouping_id       INTEGER
    ,init           INTEGER
    ,init_cancel    INTEGER
    ,push          INTEGER
    ,succ           INTEGER
    ,fail           INTEGER
    ,init_delete    INTEGER
    ,update_time    STRING
    ,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_sink',
    'document-type' = 'es_sink',
    'hosts' = 'http://xxx:9200',
    'format' = 'json',
    'filter.null-value'='true',
    'sink.bulk-flush.max-actions' = '1000',
    'sink.bulk-flush.max-size' = '10mb'
);

CREATE view  tmp as
select
    send_type,
    account_id,
    publish_time,
    msg_status,
    case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,
    case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,
    case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,
    case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,
    case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,
    case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,
    event_time,
    opt,
    ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');


--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6

CREATE view  tmp_groupby as
select
 COALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1
         when send_type is not null and account_id is null and publish_time is null then 2
         when send_type is not null and account_id is not null and publish_time is null then 3
         when send_type is not null and account_id is not null and publish_time is not null then 4
         end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上

INSERT INTO es_sink
select
     send_type
    ,account_id
    ,publish_time
    ,grouping_id
    ,init
    ,init_cancel
    ,push
    ,succ
    ,fail
    ,init_delete
    ,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

其他配置

  • flink集群参数
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.localdir: /export/io_tmp_dirs/rocksdb
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
rest.flamegraph.enabled: true
pipeline.operator-chaining: false
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.network.min: 128 mb
taskmanager.memory.network.max: 128 mb
taskmanager.memory.framework.off-heap.size: 32mb
taskmanager.memory.task.off-heap.size: 32mb
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.fraction: 0.03
  • 检查点配置
    FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析,flink,大数据,flink

  • job运行资源
    管理节点(JM) 1 个, 节点规格 1 核 4 GB内存, 磁盘 10Gi
    运行节点(TM)10 个, 节点规格 1 核 4 GB内存, 磁盘 80Gi
    单TM槽位数(Slot): 1
    默认并行度:8

  • es mapping

#POST app_cust_syyy_private_domain_syyy_group_msg/app_cust_syyy_private_domain_syyy_group_msg/_mapping
{
    "app_cust_syyy_private_domain_syyy_group_msg": {
        "properties": {
            "send_type": {
                "type": "keyword",
                "ignore_above": 256
            },
            "account_id": {
                "type": "keyword"
            },
           "publish_time": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
           			"ignore_malformed":"true" # 忽略错误的各式
           		}
           	}
           },
            "grouping_id": {
                "type": "integer"
            },
            "init": {
                "type": "integer"
            },
            "init_cancel": {
                "type": "integer"
            },
            "query": {
                "type": "integer"
            },
            "succ": {
                "type": "integer"
            },
            "fail": {
                "type": "integer"
            },
            "init_delete": {
                "type": "integer"
            },
            "update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

性能调优

是否开启【MiniBatch 聚合】和【Local-Global 聚合】对分组聚合场景影响巨大,尤其是在数据量大的场景下。

  • 如果未开启,在分组聚合,数据更新状态时,每条数据都会触发聚合运算,进而更新StateBackend (尤其是对于 RocksDB StateBackend,火焰图上反映就是一直在update rocksdb),造成上游算子背压特别大。此外,生产中非常常见的数据倾斜会使这个问题恶化,并且容易导致 job 发生反压。
    FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析,flink,大数据,flink

  • 在开启【MiniBatch 聚合】和【Local-Global 聚合】后,配置如下:

--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

开启配置好会在DAG上添加两个环节MiniBatchAssignerLocalGroupAggregate
FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析,flink,大数据,flink

对结果的影响

开启了【MiniBatch 聚合】和【Local-Global 聚合】后,一天处理不完的数据,在10分钟内处理完毕

输出结果

FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析,flink,大数据,flinkFlinkSQL【分组聚合-多维分析-性能调优】应用实例分析,flink,大数据,flink

参考:
Group Aggregation
Streaming Aggregation Performance Tuning文章来源地址https://www.toymoban.com/news/detail-796621.html

到了这里,关于FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Redisson】分布式锁源码分析如何实现多个应用实例互斥

    lockName就是保存到Redis里面的key 直接进行构建方法里面的 super(commandExecutor, name); org.redisson.connection.ServiceManager : private final String id = UUID.randomUUID().toString(); 这个 id 就是 UUID : this.id = getServiceManager().getId(); 这个entryName通过UUID可以区分是哪个应用实例 entryName+threadId可以区分哪个应

    2024年02月11日
    浏览(42)
  • MySQL修炼手册4:分组与聚合:GROUP BY与HAVING的应用

    MySQL数据库的强大功能为我们提供了丰富的数据处理工具,其中GROUP BY与HAVING的应用使得数据的分组与聚合变得更加灵活和高效。在本篇博客中,我们将深入研究GROUP BY与HAVING的基础知识,并通过实际案例,展示它们在数据分析中的强大威力。 首先,为了更好地演示GROUP BY与

    2024年02月01日
    浏览(93)
  • Flink 优化(六) --------- FlinkSQL 调优

    FlinkSQL 官网配置参数: https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html Flink SQL 新手有可能犯的错误,其中之一就是忘记设置空闲状态保留时间导致状态爆炸。列举两个场景: ➢ FlinkSQL 的 regular join(inner、left、right),左右表的数据都会一直保存在状态里,不

    2024年02月14日
    浏览(42)
  • Pix4Dmapper空间三维模型的应用实例:GIS选址分析

      本文介绍基于 无人机影像建模完成后的结果 ,利用 ArcMap 软件进行 空间选址分析 ,从而实现空间三维模型应用的方法。 目录 1 空间分析目标确立 2 基于基本约束条件的选址求解 2.1 坡度计算与提取 2.2 海拔提取 2.3 LAS数据初探 2.4 淹没分析 2.5 区域相交 2.6 面积约束 3 基于

    2024年02月04日
    浏览(41)
  • FlinkSQL 时间语义、窗口和聚合

    目录 一、时间语义 1.1 事件时间 1.1.1 在创建表的DDL中定义 1.1.2 在数据流转换为表时定义 1.2 处理时间  1.2.1 在创建表的DDL中定义 二、窗口  2.1 分组窗口(老版本,已经弃用,未来的版本中可能会删除) 2.2 窗口表值函数 (Windowing TVFs,新版本,从1.13起) 2.2.1 滚动窗口(TUMBLE) 2.2.2 滑动

    2024年02月16日
    浏览(45)
  • 【海量数据挖掘/数据分析】之 决策树模型(决策树模型、决策树构成、决策树常用算法、决策树性能要求、信息增益、信息增益计算公式、决策树信息增益计算实例)

    目录 【海量数据挖掘/数据分析】之 决策树模型(决策树模型、决策树构成、决策树常用算法、决策树性能要求、信息增益、信息增益计算公式、决策树信息增益计算实例) 一、决策树模型 1、常用算法 2、属性划分策略 3、其他算法 三、决策树算法性能要求 四、 决策树模型

    2024年02月13日
    浏览(58)
  • 【kafka性能测试脚本详解、性能测试、性能分析与性能调优】

    Apache Kafka 官方提供了两个客户端性能测试脚本,它们的存放位置如下: 生产者性能测试脚本:$KAFKA_HOME/bin/kafka-producer-perf-test.sh 消费者性能测试脚本:$KAFKA_HOME/bin/kafka-consumer-perf-test.sh kafka-producer-perf-test.sh 支持测试的性能指标包括:吞吐量(throughput)、最大时延(max-latenc

    2024年02月04日
    浏览(64)
  • 目标检测算法之YOLOv5的应用实例(零售业库存管理、无人机航拍分析、工业自动化领域应用的详解)

    在零售业库存管理中,YOLOv5可以帮助自动化商品识别和库存盘点过程。通过使用深度学习模型来实时识别货架上的商品,零售商可以更高效地管理库存,减少人工盘点的时间和成本。以下是一个使用YOLOv5进行商品识别的Python脚本示例:

    2024年02月20日
    浏览(79)
  • FlinkSQL聚合函数(Aggregate Function)详解

    使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。 上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法 。 案例场景: 关于饮料的表,有三个字段,分别是 id、name、price,表⾥有 5 ⾏数据,找到所有饮料⾥最贵的饮料的价格,即执⾏⼀个 max(

    2024年02月04日
    浏览(40)
  • redis性能测试及瓶颈分析调优

    一、简介 Redis(Remote Dictionary Server ),即远程字典服务,是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API mysql与redis的区别: 类型上mysql是关系型数据库,而redis是缓存数据库; 作用上mysql用于持久化的存储数

    2024年02月06日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包