窗口函数就是对一个窗口内的数据的操作处理。Flink的窗口函数分为两类:
- 窗口聚合函数:ReduceFunction和AggregateFunction,来一条聚合一条,只在窗口关闭时才会输出
- 全窗口处理函数:ProcessWindowFunction,来一条保存一条,只有在窗口关闭的时候才聚合或者处理,输出结果
ReduceFuntion
注意:
- 窗口的reduce() 时WindowedStream的聚合函数,而不是KeyedStream的reduce()聚合函数。
- Flink可以使用ReduceFunction增量地聚合窗口的元素。
package 复习
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
* ReduceFunction
* 需求:
* 获取某天某省---每10S新增最大记录值
2022-5-18 beijing 2
2022-5-18 beijing 3
2022-5-18 shanghai 6
*/
object Window_ReduceFunction {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.socketTextStream("localhost", 6666)
.map(x => {
val fields: Array[String] = x.split(" ")
val date = fields(0).trim
val province = fields(1)
val ts = fields(2).trim.toInt
(date + "_" + province, ts)
}).keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction[(String,Int)]{
override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
if(t._2>t1._2){
t
}else{
t1
}
}
}).print()
env.execute("test")
}
}
AggregateFunction
注意:
- AggregateFunction比ReduceFunction更加灵活。是reduceFunction的一种特殊实现。
- AggregateFunction<泛型1,泛型2,泛型3>,泛型1 是输入类型,泛型2是累加器结果类型,泛型3是输出类型。
- 需要重写四个方法:累加器的初始化,聚合逻辑,获取结果,会话窗口的合并窗口
- Flink也可以使用AggregateFunction增量地聚合窗口的元素
package 复习
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
* AggregateFunction
*
* 窗口增量聚合函数:
* 需求:
* 获取某天某省---每10S平均新增记录
2022-5-18 beijing 2
2022-5-18 beijing 3
2022-5-18 shanghai 6
*
*/
object Window_AggregateFunction {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.socketTextStream("localhost", 6666)
.map(x => {
val fields: Array[String] = x.split(" ")
val date = fields(0).trim
val province = fields(1)
val ts = fields(2).trim.toInt
(date + "_" + province, ts)
}).keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// 输入类型,累加器,输出类型
.aggregate(new AggregateFunction[(String,Int),(String,Int,Int),(String,Int)]{
// 初始化
override def createAccumulator(): (String, Int, Int) = ("",0,0)
// 单个增加
override def add(in: (String, Int), acc: (String, Int, Int)): (String, Int, Int) ={
val cnt = acc._2+1
val adds = acc._3+in._2
(in._1,cnt,adds)
}
// 多个分区(窗口)进行合并
override def merge(acc: (String, Int, Int), acc1: (String, Int, Int)): (String, Int, Int) = {
val mergecnt = acc._2+acc1._2
val mergeadd = acc._3+acc1._3
(acc._1,mergecnt,mergeadd)
}
// 获取结果 这里的string 我将个数和sum全部输出出来方便查看
override def getResult(acc: (String, Int, Int)): (String, Int) = {
(acc._1+"_"+acc._2+"_"+acc._3,acc._3 / acc._2)
}
}).print()
env.execute("test 08")
}}
ProcessWIndowFunction
注意:
- 全窗口函数-process(new ProcessWindowFunction)获取一个包含窗口所有元素的iterable,以及一个可以访问时间和状态信息的Context对象,这使得它比其他窗口函数提供了更多的灵活性。
- ProcessWindowFunction以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到窗口逻辑可以处理为止。
窗口比较大,或者数据量比较大,不建议使用,会占用更多的内存
package 复习
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
* AggregateFunction
*
* 窗口增量聚合函数:
* 需求:
* 获取某天某省---每10S平均新增记录
2022-5-18 beijing 2
2022-5-18 beijing 3
2022-5-18 shanghai 6
*
*/
object Window_ProcessWindowFunction {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.socketTextStream("localhost", 6666)
.map(x => {
val fields: Array[String] = x.split(" ")
val date = fields(0).trim
val province = fields(1)
val ts = fields(2).trim.toInt
(date + "_" + province, ts)
}).keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
var cnt = 0
var totalAdds = 0
elements.foreach(x =>{
cnt+=1
totalAdds = x._2+totalAdds
})
// 这里也是将计数和sum放在了string里。
out.collect((key+"_"+cnt+"_"+totalAdds),totalAdds/cnt)
}
}).print()
env.execute("test 09")
}
}
窗口增量聚合处理函数
ProcessWindowFunction可以与ReduceFunction或AggregateFunction结合在一起,在元素到达窗口时进行增量。当窗口关闭时,ProcessWindowFunction将提供聚合的结果,这允许它在访问ProcessWindowFunction的附加窗口元信息的同时,递增地计算窗口。你也可以使用旧的WindowFunction而不是ProcessWindowFunction来增加窗口聚合。
使用ReduceFunction与WindowFunction组合,返回窗口中最大的事件以及窗口的开始时间
package 复习
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object Window_ReduceFunction_And_Process {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.socketTextStream("localhost", 6666)
.map(x => {
val fields: Array[String] = x.split(" ")
val date = fields(0).trim
val province = fields(1)
val ts = fields(2).trim.toInt
(date + "_" + province, ts)
}).keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce((r1: (String, Int), r2: (String, Int)) => {
if (r1._2 > r2._2) {
r1
} else {
r2
}
},
(key: String,
window: TimeWindow,
maxReadings: Iterable[(String, Int)],
out: Collector[String]) => {
val max = maxReadings.iterator.next()
out.collect(window.getStart + "," + max._1 + "," + max._2)
})
.print()
env.execute("test 1")
}
}
Reduce&Aggregate&Process窗口函数区别
相同
- 针对窗口里面的数据做聚合或者其他处理
不同
- Reduce:聚合,通用性较强,可以使用多个function包括ReduceFunction,AggregateFunction,FoldFunction or ProcessWindowFunction 支持增量计算
- Aggregate:是一种Reduce的通用特例,多用于累加;支持增量计算。
- Process:比Redeuce和Aggregate更灵活,除聚合外可以做更多操作处理;不支持增量(即全量),窗口较大时不建议
ProcessWIndowFunction和WIndowFunction的区别
ProcessWindowFunction
-
可以访问Keyed State(就像任何富函数一样,你们想象富函数是否能访问状态)。
-
ProcessWindowFunction还可以将当前窗口外的Keyed State使用到当前正在处理的窗口函数中。换句话就是不同窗口数据调用。
-
该函数的process()调用接收到Context对象上的两个方法,它们允许访问两种类型的状态:
- globalState():它允许访问执行窗口之外的Keyed State。
- windowState():它允许访问执行窗口域内的Keyed State。
-
当使用窗口状态时,清除窗口状态也很重要。这应该在clear()方法中发生。文章来源:https://www.toymoban.com/news/detail-834105.html
WindowFunction文章来源地址https://www.toymoban.com/news/detail-834105.html
- 使用WindowFunction的地方,你都能使用ProcessWindowFunction。
- WindowFunction是ProcessWindowFunction的一个旧版本,它提供了较少的上下文信息,并且没有一些高级特性,比如每个窗口的Keyed State。
- 这个接口将在某个时候被弃用。
到了这里,关于Flink 中Window Functions的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!