Flink 中Window Functions

这篇具有很好参考价值的文章主要介绍了Flink 中Window Functions。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

窗口函数就是对一个窗口内的数据的操作处理。Flink的窗口函数分为两类:

  • 窗口聚合函数:ReduceFunction和AggregateFunction,来一条聚合一条,只在窗口关闭时才会输出
  • 全窗口处理函数:ProcessWindowFunction,来一条保存一条,只有在窗口关闭的时候才聚合或者处理,输出结果
    Flink 中Window Functions,flink,大数据

ReduceFuntion

注意:

  • 窗口的reduce() 时WindowedStream的聚合函数,而不是KeyedStream的reduce()聚合函数。
  • Flink可以使用ReduceFunction增量地聚合窗口的元素。
package 复习
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


/**
 * ReduceFunction
 * 需求:
 * 获取某天某省---每10S新增最大记录值
    2022-5-18 beijing 2
    2022-5-18 beijing 3
    2022-5-18 shanghai 6
 */


object Window_ReduceFunction {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds = env.socketTextStream("localhost", 6666)
      .map(x => {
        val fields: Array[String] = x.split(" ")
        val date = fields(0).trim
        val province = fields(1)
        val ts = fields(2).trim.toInt
        (date + "_" + province, ts)
      }).keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .reduce(new ReduceFunction[(String,Int)]{
        override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
          if(t._2>t1._2){
            t
          }else{
            t1
          }
        }
      }).print()


    env.execute("test")


  }
}

AggregateFunction

注意:

  • AggregateFunction比ReduceFunction更加灵活。是reduceFunction的一种特殊实现。
    • AggregateFunction<泛型1,泛型2,泛型3>,泛型1 是输入类型,泛型2是累加器结果类型,泛型3是输出类型。
    • 需要重写四个方法:累加器的初始化,聚合逻辑,获取结果,会话窗口的合并窗口
  • Flink也可以使用AggregateFunction增量地聚合窗口的元素
package 复习
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


/**
 * AggregateFunction
 *
 * 窗口增量聚合函数:
 * 需求:
 * 获取某天某省---每10S平均新增记录
    2022-5-18 beijing 2
    2022-5-18 beijing 3
    2022-5-18 shanghai 6
 *
 */
object Window_AggregateFunction {
  def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val ds = env.socketTextStream("localhost", 6666)
    .map(x => {
      val fields: Array[String] = x.split(" ")
      val date = fields(0).trim
      val province = fields(1)
      val ts = fields(2).trim.toInt
      (date + "_" + province, ts)
    }).keyBy(_._1)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
//    输入类型,累加器,输出类型
    .aggregate(new AggregateFunction[(String,Int),(String,Int,Int),(String,Int)]{
//      初始化
      override def createAccumulator(): (String, Int, Int) = ("",0,0)
//     单个增加
      override def add(in: (String, Int), acc: (String, Int, Int)): (String, Int, Int) ={
        val cnt = acc._2+1
        val adds = acc._3+in._2
        (in._1,cnt,adds)
      }
//    多个分区(窗口)进行合并
      override def merge(acc: (String, Int, Int), acc1: (String, Int, Int)): (String, Int, Int) = {
        val mergecnt = acc._2+acc1._2
        val mergeadd = acc._3+acc1._3
        (acc._1,mergecnt,mergeadd)
      }
//      获取结果    这里的string 我将个数和sum全部输出出来方便查看
      override def getResult(acc: (String, Int, Int)): (String, Int) = {
        (acc._1+"_"+acc._2+"_"+acc._3,acc._3 / acc._2)
      }
    }).print()

    env.execute("test 08")
}}

ProcessWIndowFunction

注意:

  • 全窗口函数-process(new ProcessWindowFunction)获取一个包含窗口所有元素的iterable,以及一个可以访问时间和状态信息的Context对象,这使得它比其他窗口函数提供了更多的灵活性。
  • ProcessWindowFunction以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到窗口逻辑可以处理为止。
    窗口比较大,或者数据量比较大,不建议使用,会占用更多的内存
package 复习
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
 * AggregateFunction
 *
 * 窗口增量聚合函数:
 * 需求:
 * 获取某天某省---每10S平均新增记录
    2022-5-18 beijing 2
    2022-5-18 beijing 3
    2022-5-18 shanghai 6
 *
 */
object Window_ProcessWindowFunction {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds = env.socketTextStream("localhost", 6666)
      .map(x => {
        val fields: Array[String] = x.split(" ")
        val date = fields(0).trim
        val province = fields(1)
        val ts = fields(2).trim.toInt
        (date + "_" + province, ts)
      }).keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .process(new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
        override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
          var cnt = 0
          var totalAdds = 0
          elements.foreach(x =>{
            cnt+=1
            totalAdds = x._2+totalAdds
          })
//          这里也是将计数和sum放在了string里。
          out.collect((key+"_"+cnt+"_"+totalAdds),totalAdds/cnt)
        }
      }).print()



    env.execute("test 09")
  }

}

窗口增量聚合处理函数

ProcessWindowFunction可以与ReduceFunction或AggregateFunction结合在一起,在元素到达窗口时进行增量。当窗口关闭时,ProcessWindowFunction将提供聚合的结果,这允许它在访问ProcessWindowFunction的附加窗口元信息的同时,递增地计算窗口。你也可以使用旧的WindowFunction而不是ProcessWindowFunction来增加窗口聚合。

使用ReduceFunction与WindowFunction组合,返回窗口中最大的事件以及窗口的开始时间

package 复习
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


object Window_ReduceFunction_And_Process {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds = env.socketTextStream("localhost", 6666)
      .map(x => {
        val fields: Array[String] = x.split(" ")
        val date = fields(0).trim
        val province = fields(1)
        val ts = fields(2).trim.toInt
        (date + "_" + province, ts)
      }).keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .reduce((r1: (String, Int), r2: (String, Int)) => {
        if (r1._2 > r2._2) {
          r1
        } else {
          r2
        }
      },
        (key: String,
         window: TimeWindow,
         maxReadings: Iterable[(String, Int)],
         out: Collector[String]) => {
          val max = maxReadings.iterator.next()
          out.collect(window.getStart + "," + max._1 + "," + max._2)
        })
      .print()


    env.execute("test 1")
  }

}

Reduce&Aggregate&Process窗口函数区别

相同

  • 针对窗口里面的数据做聚合或者其他处理

不同

  • Reduce:聚合,通用性较强,可以使用多个function包括ReduceFunction,AggregateFunction,FoldFunction or ProcessWindowFunction 支持增量计算
  • Aggregate:是一种Reduce的通用特例,多用于累加;支持增量计算。
  • Process:比Redeuce和Aggregate更灵活,除聚合外可以做更多操作处理;不支持增量(即全量),窗口较大时不建议

ProcessWIndowFunction和WIndowFunction的区别

ProcessWindowFunction

  • 可以访问Keyed State(就像任何富函数一样,你们想象富函数是否能访问状态)。

  • ProcessWindowFunction还可以将当前窗口外的Keyed State使用到当前正在处理的窗口函数中。换句话就是不同窗口数据调用。

  • 该函数的process()调用接收到Context对象上的两个方法,它们允许访问两种类型的状态:

    • globalState():它允许访问执行窗口之外的Keyed State。
    • windowState():它允许访问执行窗口域内的Keyed State。
  • 当使用窗口状态时,清除窗口状态也很重要。这应该在clear()方法中发生。

WindowFunction文章来源地址https://www.toymoban.com/news/detail-834105.html

  • 使用WindowFunction的地方,你都能使用ProcessWindowFunction。
  • WindowFunction是ProcessWindowFunction的一个旧版本,它提供了较少的上下文信息,并且没有一些高级特性,比如每个窗口的Keyed State。
  • 这个接口将在某个时候被弃用。

到了这里,关于Flink 中Window Functions的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月24日
    浏览(30)
  • Flink之Window窗口机制

    在大多数场景下,需要统计的数据流都是无界的,因此无法等待整个数据流终止后才进行统计。通常情况下,只需要对某个时间范围或者数量范围内的数据进行统计分析 例如: 因此,在Apache Flink中,窗口是对无界数据流进行有界处理的机制。窗口可以将无限的数据流划分为

    2024年02月06日
    浏览(32)
  • Flink窗口(2)—— Window API

    目录 窗口分配器 时间窗口 计数窗口 全局窗口 窗口函数 增量聚合函数 全窗口函数(full window functions) 增量聚合和全窗口函数的结合使用 Window API 主要由两部分构成: 窗口分配器 (Window Assigners)和 窗口函数 (Window Functions) 在window()方法中传入一个窗口分配器; 在aggreg

    2024年01月16日
    浏览(29)
  • flink作业 windowAll 转换window

    datastream 流中没有使用keyby需要使用windowAll函数,使用了keyby的需要使用window函数 windowAll的函数: 并行度只能是1,性能不高 window的函数:并行度可以任意,性能高 线上的flink作业的架构如下图所示: 1.先从rocketmq读取数据,通过windowAll类型的窗口进行10s的数据攒批; 2.攒批的数

    2024年01月18日
    浏览(26)
  • Flink-Window详细讲解

    当谈到实时数据处理和流式计算,Apache Flink 是一个备受推崇的工具,它提供了丰富的功能来处理连续的数据流。其中,窗口(Window)是 Flink 中一个关键的概念,它使得我们能够在有限的数据集上执行各种计算和分析操作。本文将深入介绍 Flink 窗口的不同类型、使用方法以及

    2024年02月13日
    浏览(29)
  • Flink window 源码分析4:WindowState

    本文源码为flink 1.18.0版本。 其他相关文章: Flink window 源码分析1:窗口整体执行流程 Flink window 源码分析2:Window 的主要组件 Flink window 源码分析3:WindowOperator Flink window 源码分析4:WindowState 主要考虑 reduce、aggregate 函数中的托管状态是在什么时候触发和使用的?使用时与Win

    2024年01月25日
    浏览(37)
  • Flink-Window详细讲解-countWindow

    一.countWindow和countWindowall区别 1.countWindow : 如果您使用 countWindow(5) ,这意味着您将数据流划分成多个大小为 5 的窗口。划分后的窗口如下: 窗口 1: [1, 2, 3, 4, 5] 窗口 2: [6, 7, 8, 9, 10] 当每个窗口中的元素数量达到 5 时,将触发计算。这意味着窗口 1 中的计算会在处理 5 个元素后

    2024年02月09日
    浏览(27)
  • flink的window和windowAll的区别

    在flink的窗口函数运用中,window和windowAll方法总是会引起混淆,特别是结合上GlobalWindow的组合时,更是如此,本文就来梳理下他们的区别和常见用法 window是KeyStream数据流的方法,其并行度是任意的,也就是最大可以和分组key的数量相同 windowAll是DataStream数据流的方法,其并行

    2024年01月25日
    浏览(27)
  • Flink TableAPI Window and Watermarket

    本次主要是弄清楚.批流统一 的处理方式,因为它是使用SQL来操作批流计算的.所以它怎么设置算子并行度?如何设置窗口?如何处理流式数据?等等 有很多疑问. 我还是觉得直接使用流计算的API更好.流批一体API最终也是转换成流式计算,最主要的是使用sql来设置算子或者窗口,并不直

    2024年02月12日
    浏览(28)
  • Flink window 源码分析1:窗口整体执行流程

    注:本文源码为flink 1.18.0版本。 其他相关文章: Flink window 源码分析1:窗口整体执行流程 Flink window 源码分析2:Window 的主要组件 Flink window 源码分析3:WindowOperator Flink window 源码分析4:WindowState Window 本质上就是借助状态后端缓存着一定时间段内的数据,然后在达到某些条件

    2024年01月16日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包