1.聚合算子简介
常见的聚合算子 sum,max,min等
聚合算子可以在在keyedStream 流上进行滚动的聚合(即累计的操作),而且同一个 keyedStream 流上只能调用一次 聚合算子
文章来源地址https://www.toymoban.com/news/detail-471425.html
sum 示例:
import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object aggregationTest { //defined the dataSource's type case class StockPrice(stockId:String, timeStamp:Long, price:Double) def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment //generate ds val stockList = List(StockPrice("stock_1", 66666, 1) , StockPrice("stock_1", 8888, 2) , StockPrice("stock_2", 77777, 1) , StockPrice("stock_2", 999, 3) , StockPrice("stock_3", 3333, 1) ) val ds = env.fromCollection(stockList) //transformation val keyedStream = ds.keyBy("stockId") val sumedStream = keyedStream.sum(2) sumedStream.print() env.execute() } }
输出结果:
max示例:
import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment //defined the dataSource's type case class StockPrice(stockId:String, timeStamp:Long, price:Double) object maxTest { def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment //generate ds val stockList = List(StockPrice("stock_1", 66666, 1) , StockPrice("stock_1", 8888, 2) , StockPrice("stock_2", 77777, 1) , StockPrice("stock_2", 999, 3) , StockPrice("stock_3", 3333, 1) ) val ds = env.fromCollection(stockList) //transformation val keyedStream = ds.keyBy("stockId") val maxedStream = keyedStream.max(2) maxedStream.print() env.execute() } }
输出结果:
文章来源:https://www.toymoban.com/news/detail-471425.html
到了这里,关于Flink学习20:聚合算子(sum,max,min)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!