Flink TableAPI Aggregation And DataType

这篇具有很好参考价值的文章主要介绍了Flink TableAPI Aggregation And DataType。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

序言

这里整理下聚合的优化选项 以及 数据类型

Stream Aggregation

SQL 是数据分析中使用最广泛的语言。Flink Table API 和 SQL 使用户能够以更少的时间和精力定义高效的流分析应用程序。此外,Flink Table API 和 SQL 是高效优化过的,它集成了许多查询优化和算子优化。但并不是所有的优化都是默认开启的,因此对于某些工作负载,可以通过打开某些选项来提高性能。

虽然在介绍TableApi 但是在StreamApi的算子上也可以借鉴他们优化的处理方式cuiyaonan2000@163.com

MiniBatch 聚合 #

默认情况下,无界聚合算子是逐条处理输入的记录,即:(1)从状态中读取累加器,(2)累加/撤回记录至累加器,(3)将累加器写回状态,(4)下一条记录将再次从(1)开始处理。这种处理模式可能会增加 StateBackend 开销(尤其是对于 RocksDB StateBackend )。此外,生产中非常常见的数据倾斜会使这个问题恶化,并且容易导致 job 发生反压。

MiniBatch 聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个 key 只需一个操作即可访问状态。这样可以大大减少状态开销并获得更好的吞吐量。但是,这可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理它们。这是吞吐量和延迟之间的权衡。---这中跟windows很像,就是根据一段时间来处理,那跟window是否有冲突呢cuiyaonan2000@163.com????

下图说明了 mini-batch 聚合如何减少状态操作。

Flink TableAPI Aggregation And DataType,Big Data Computing,Flink

代码中开启优化

// instantiate table environment
TableEnvironment tEnv = ...;

// access flink configuration
TableConfig configuration = tEnv.getConfig();
// set low-level key-value options
configuration.set("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
configuration.set("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
configuration.set("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task

Local-Global 聚合 #

Local-Global 聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于 MapReduce 中的 Combine + Reduce 模式。例如,就以下 SQL 而言:

SELECT color, sum(id)
FROM T
GROUP BY color

数据流中的记录可能会倾斜,因此某些聚合算子的实例必须比其他实例处理更多的记录,这会产生热点问题。本地聚合可以将一定数量具有相同 key 的输入数据累加到单个累加器中。全局聚合将仅接收 reduce 后的累加器,而不是大量的原始输入数据。这可以大大减少网络 shuffle 和状态访问的成本。每次本地聚合累积的输入数据量基于 mini-batch 间隔。这意味着 local-global 聚合依赖于启用了 mini-batch 优化。

下图显示了 local-global 聚合如何提高性能。

Flink TableAPI Aggregation And DataType,Big Data Computing,Flink

 代码开启,这个我们在streamApi中可以借鉴

// instantiate table environment
TableEnvironment tEnv = ...;

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation

拆分 distinct 聚合 #

Local-Global 优化可有效消除常规聚合的数据倾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在处理 distinct 聚合时,其性能并不令人满意。---是丢Local-Global的补充优化

例如,如果我们要分析今天有多少唯一用户登录。我们可能有以下查询:

SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

如果 distinct key (即 user_id)的值分布稀疏,则 COUNT DISTINCT 不适合减少数据。即使启用了 local-global 优化也没有太大帮助。因为累加器仍然包含几乎所有原始记录,并且全局聚合将成为瓶颈(大多数繁重的累加器由一个任务处理,即同一天)。

这个优化的想法是将不同的聚合(例如 COUNT(DISTINCT col))分为两个级别。第一次聚合由 group key 和额外的 bucket key 进行 shuffle。bucket key 是使用 HASH_CODE(distinct_key) % BUCKET_NUM 计算的。BUCKET_NUM 默认为1024,可以通过 table.optimizer.distinct-agg.split.bucket-num 选项进行配置。第二次聚合是由原始 group key 进行 shuffle,并使用 SUM 聚合来自不同 buckets 的 COUNT DISTINCT 值。由于相同的 distinct key 将仅在同一 bucket 中计算,因此转换是等效的。bucket key 充当附加 group key 的角色,以分担 group key 中热点的负担。bucket key 使 job 具有可伸缩性来解决不同聚合中的数据倾斜/热点。

拆分 distinct 聚合后,以上查询将被自动改写为以下查询:

SELECT day, SUM(cnt)
FROM (
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

下图显示了拆分 distinct 聚合如何提高性能(假设颜色表示 days,字母表示 user_id)。

Flink TableAPI Aggregation And DataType,Big Data Computing,Flink

开启代码

// instantiate table environment
TableEnvironment tEnv = ...;

tEnv.getConfig()
  .set("table.optimizer.distinct-agg.split.enabled", "true");  // enable distinct agg split

在 distinct 聚合上使用 FILTER 修饰符 #

在某些情况下,用户可能需要从不同维度计算 UV(独立访客)的数量,例如来自 Android 的 UV、iPhone 的 UV、Web 的 UV 和总 UV。很多人会选择 CASE WHEN,例如:

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

但是,在这种情况下,建议使用 FILTER 语法而不是 CASE WHEN。因为 FILTER 更符合 SQL 标准,并且能获得更多的性能提升。FILTER 是用于聚合函数的修饰符,用于限制聚合中使用的值。将上面的示例替换为 FILTER 修饰符,如下所示:

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day

Flink SQL 优化器可以识别相同的 distinct key 上的不同过滤器参数。例如,在上面的示例中,三个 COUNT DISTINCT 都在 user_id 一列上。Flink 可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小。在某些工作负载下,可以获得显著的性能提升。

数据类型

数据类型: 数据类型 | Apache Flink

时间类型: 时区 | Apache Flink文章来源地址https://www.toymoban.com/news/detail-575090.html

到了这里,关于Flink TableAPI Aggregation And DataType的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Introduction to Flink Streaming Platform for Big Data

    作者:禅与计算机程序设计艺术 Flink是一个开源的分布式流处理框架,它允许快速轻松地进行实时数据处理,提供了一个完整的数据流程解决方案。它支持低延迟的实时数据计算、高吞吐量的实时数据传输以及复杂事件处理(CEP)。Flink在Apache顶级项目中排名第二,同时也被很多

    2024年02月07日
    浏览(8)
  • Beyond Big Data: New Applications in the Age of 5G and

    作者:禅与计算机程序设计艺术 随着经济、科技和社会的快速发展,信息技术正在改变我们的生活。从20世纪70年代开始,大数据技术已经成为热门话题。基于大数据的应用如搜索引擎、推荐系统、图像识别、地图导航等已经发展出一批商业化产品。但在最近几年里,随着5

    2024年02月08日
    浏览(7)
  • “大数据处理”的现状 Scaling up and out: Towards an efficient processing of big Data

    作者:禅与计算机程序设计艺术 Hadoop 是 Apache 基金会于 2007 年推出的开源分布式计算框架。它是一个通用计算平台,可用于存储、处理和分析大量的数据集。它是一个分布式文件系统(HDFS),一个资源管理器(YARN),和一些常用的组件如 MapReduce、Hive 和 Pig。在数据量达到海

    2024年02月08日
    浏览(8)
  • 【大数据】数据分析和挖掘技术和应用 A Brief Review of Big Data Technologies and Application

    作者:禅与计算机程序设计艺术 在现代信息社会里,数据的爆炸性增长已经给传统行业带来巨大的商机,并促进了人工智能、机器学习、云计算等新兴技术的出现。作为数据驱动的经济领域,数据分析和挖掘技术成为绩效提升和产品优化的关键环节,也是各个公司争相追逐的

    2024年02月07日
    浏览(14)
  • 【文献分享】Big data and benchmarking initiatives to bridge the gap from AlphaFold to drug design

    【文献分享】Big data and benchmarking initiatives to bridge the gap from AlphaFold to drug design

    今天来精读一篇发在《Nature Chemical Biology》上的最新评论,题目为: Big data and benchmarking initiatives to bridge the gap from AlphaFold to drug design  原文链接如下: Big data and benchmarking initiatives to bridge the gap from AlphaFold to drug design | Nature Chemical Biology https://www.nature.com/articles/s41589-024-01570

    2024年03月28日
    浏览(9)
  • 2 Data Streaming Pipelines With Flink and Kafka

    作者:禅与计算机程序设计艺术 数据流是一个连续不断的、产生、存储和处理数据的过程。传统上,数据流编程都是基于特定平台(比如:消息队列,数据仓库,事件溯源)的SDK或者API进行开发,但随着云计算和容器技术的发展,越来越多的企业选择使用开源工具实现自己的

    2024年02月08日
    浏览(6)
  • Streamlining Your Data Pipeline with Databricks and Apache Flink

    大数据技术在过去的几年里发展迅速,成为了企业和组织中不可或缺的一部分。随着数据的规模和复杂性的增加,传统的数据处理技术已经无法满足需求。为了解决这个问题,我们需要一种更高效、可扩展的数据处理框架。 Databricks 和 Apache Flink 是两个非常受欢迎的开源项目

    2024年02月22日
    浏览(8)
  • Flink TableAPI 依赖问题

    “Could not instantiate the executor. Make sure a planner module is on the classpath” 添加依赖

    2024年02月12日
    浏览(7)
  • Flink实践代码-TableAPI 与 DataStream 互转

    1.代码与含义解释 1.1 思路 Flink 获取数据流后,需要做数据过滤那么首先就要有一下几个步骤: 构建运行环境 接入数据流 TableAPI 与 DataStream 互转,实现 SQL 查询 1.2 直接上代码

    2024年04月11日
    浏览(6)
  • 【Flink系列七】TableAPI和FlinkSQL初体验

    【Flink系列七】TableAPI和FlinkSQL初体验

    Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。  Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理

    2024年02月03日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包