Flink核心API之DataStream(基础常用算子)
(一)Flink核心API
Flink中提供了4种不同层次的API,每种API在简洁和易表达之间有自己的权衡,适用于不同的场景。目前上面3个会用得比较多。
低级API(Stateful Stream Processing):提供了对时间和状态的细粒度控制,简洁性和易用性较差,
主要应用在一些复杂事件处理逻辑上。
核心API(DataStream/DataSet API):主要提供了针对流数据和批数据的处理,是对低级API进行了一
些封装,提供了filter、sum、max、min等高级函数,简单易用,所以这些API在工作中应用还是比
较广泛的。
Table API:一般与DataSet或者DataStream紧密关联,可以通过一个DataSet或DataStream创建出一个Table,然后再使用类似于filter, join,或者 select这种操作。最后还可以将一个Table对象转成DataSet或DataStream。
SQL:Flink的SQL底层是基于Apache Calcite,Apache Calcite实现了标准的SQL,使用起来比其他API更加灵活,因为可以直接使用SQL语句。Table API和SQL可以很容易地结合在一块使用,因为它们都返回Table对象。
本文讲述DataStream一些基础API
DataStream API主要分为3块:DataSource、Transformation、DataSink。
DataSource是程序的输入数据源。
Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap和filter等操作。
DataSink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中
(1)DataSoure
首先我们可以用 fromElements方法来自己创建一些数据
import org.apache.flink.streaming.api.scala._
object sourceBoundedTest {
def main(args: Array[String]): Unit = {
val env =StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1.从元素中读取数据
val stream:DataStream[Int] = env.fromElements(1, 2, 3, 4, 5)
(2)Transformation
transformation是Flink程序的计算算子,负责对数据进行处理,Flink提供了大量的算子,其实Flink中的大部分算子的使用和spark中算子的使用是一样的,下面我们来看一下一些基础比较常用的算子:
算子 解释
map 输入一个元素进行处理,返回一个元素
flatMap 输入一个元素进行处理,可以返回多个元素
filter 对数据进行过滤,符合条件的数据会被留下
keyBy 根据key分组,相同key的数据会进入同一个分区
reduce 对当前元素和上一次的结果进行聚合操作
【1】map算子
首先创建一个样例类
case class Event(user:String,url:String,timestamp:Long)
咱们用map打印出user的内容
代码展示为:
object TransFormationTest {
def main(args: Array[String]): Unit = {
val env= StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream:DataStream[Event] = env.fromElements(Event("Mary","./home",1000L),Event("Bob","./cart",2000L))
stream.map(_.user).print("1")
env.execute()
打印结果为
[2].过滤(filter)
filter()转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤
条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉
进行filter()转换之后的新数据流的数据类型与原数据流是相同的。filter()转换需要传入的
参数需要实现FilterFunction 接口,而 FilterFunction 内要实现 filter()方法,就相当于一个返回
布尔类型的条件表达式。
下面的代码会将数据流中用户Mary和Bob的浏览行为过滤出来
package DataStreamApi
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
//import org.apache.spark.api.java.function.FilterFunction
import org.apache.flink.api.common.functions._
object TransformFliter {
def main(args: Array[String]): Unit = {
val env =StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream:DataStream[Event] = env.fromElements(Event("Mary","./home",1000L),Event("Bob","./cart",2000L))
//1.使用匿名函数
stream.filter(_.user=="Mary").print("1")
// 2.实现filterFunction接口
stream.filter(new UserFilter).print("2")
env.execute()
}
class UserFilter extends FilterFunction[Event]{
override def filter(t:Event):Boolean =t.user=="Bob"
}
}
运行结果
【3】扁平映射(flatMap)
flatMap()操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个
一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap()可以认为是“扁平化”
(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,
再对拆分后的元素做转换处理,如图5-7所示。我们此前WordCount程序的第一步分词操作,
就用到了flatMap()。
同map()一样,flatMap()也可以使用Lambda表达式或者FlatMapFunction接口实现类的方
式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流的数据类型相同,也
可以不同。
flatMap()的使用非常灵活,可以对结果进行任意输出,下面就是一个例子:
package DataStreamApi
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util
import java.util.stream.Collector
object TransFormFlatMap {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream:DataStream[Event]=env.fromElements(Event("Mary","./home",1000L),Event("Kevin","./root",2000L))
stream.flatMap(new myFLatMap).print()
env.execute()
}
//自定义实现flatMap接口
class myFLatMap extends FlatMapFunction[Event,String]{
override def flatMap(t: Event, collector: util.Collector[String]): Unit = {
// 如果当年是marry点击事件,那么直接输出user
if(t.user=="Mary"){
collector.collect(t.user)
}
// 如果是Kevin的点击事件
else if (t.user=="Kevin"){
collector.collect(t.user)
collector.collect(t.url)
}
}
}
}
打印结果为
聚合算子(Aggregation)
1.按键分区(keyBy)
对于Flink而言,DataStream 是没有直接进行聚合的API的。因为我们对海量数据做聚合
肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;
这个操作就是通过keyBy()来完成的。
keyBy()是聚合前必须要用到的一个算子。keyBy()通过指定键(key),可以将一条流从逻
辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对
应着任务槽(task slots)。
基于不同的key,流中的数据将被分配到不同的分区中去,如图5-8所示;这样一来,所
有具有相同的key的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个slot
中进行处理了。
import org.apache.flink.streaming.api.scala._
object TransKeyBy {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.fromElements(
Event("Mary", "./home", 1000L),
Event("Bob", "./cart", 2000L)
)
//指定Event的user属性作为key
val keyedStream = stream.keyBy(_.user)
keyedStream.print()
env.execute()
}
}
2. 简单聚合
有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们
内置实现了一些最基本、最简单的聚合API,主要有以下几种:
⚫ sum():在输入流上,对指定的字段做叠加求和的操作。
⚫ min():在输入流上,对指定的字段求最小值。
⚫ max():在输入流上,对指定的字段求最大值。
⚫ minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计
算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包
含字段最小值的整条数据。
⚫ maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与
min()/minBy()完全一致。
简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;
但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字
段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字
段的名称,是以_1、_2、_3、…来命名的。
例如,下面就是对元组数据流进行聚合的测试:
import org.apache.flink.streaming.api.scala._
object TransTupleAggregation {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.fromElements(
("a", 1), ("a", 3), ("b", 3), ("b", 4)
)
stream.keyBy(_._1).sum(1).print() //对元组的索引1位置数据求和
stream.keyBy(_._1).sum("_2").print() //对元组的第2个位置数据求和
stream.keyBy(_._1).max(1).print() //对元组的索引1位置求最大值
stream.keyBy(_._1).max("_2").print() //对元组的第2个位置数据求最大值
stream.keyBy(_._1).min(1).print() //对元组的索引1位置求最小值
stream.keyBy(_._1).min("_2").print() //对元组的第2个位置数据求最小值
stream.keyBy(_._1).maxBy(1).print() //对元组的索引1位置求最大值
stream.keyBy(_._1).maxBy("_2").print() //对元组的第2个位置数据求最大值
stream.keyBy(_._1).minBy(1).print() //对元组的索引1位置求最小值
stream.keyBy(_._1).minBy("_2").print() //对元组的第2个位置数据求最小值
env.execute()
}
}
而如果数据流的类型是样例类,那么就只能通过字段名称来指定,不能通过位置来指定了。
package com.atguigu.chapter05
import org.apache.flink.streaming.api.scala._
object TransAggregationCaseClass {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.fromElements(
Event("Mary", "./home", 1000L),
Event("Bob", "./cart", 2000L)
)
// 使用user作为分组的字段,并计算最大的时间戳
stream.keyBy(_.user).max("timestamp").print()
env.execute()
}}
一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。
所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值
的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,
应该只用在含有有限个key的数据流上。
3. 归约聚合(reduce)
与简单聚合类似,reduce()操作也会将KeyedStream转换为DataStream。它不会改变流的
元素数据类型,所以输出类型和输入类型是一样的。
调用KeyedStream的reduce()方法时,需要传入一个参数,实现ReduceFunction接口。接
口在源码中的定义如下:
public interface ReduceFunction extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}
ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处
理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再
将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据,
这也就是reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果”
作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
下面我们来看一个稍复杂的例子。
我们将数据流按照用户id进行分区,然后用一个reduce()算子实现sum()的功能,统计每
个用户访问的频次;进而将所有统计结果分到一组,用另一个reduce()算子实现maxBy()的功
能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。
package DataStreamApi
import org.apache.flink.api.common.functions._
import org.apache.flink.streaming.api.scala._
object TransFormReduce {
def main(args: Array[String]): Unit = {
val env=StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val steam=env.fromElements(Event("marry","./portId=1",1000L)
,Event("Kevin","./portId=2",2000L)
,Event("Bob","./portId=3",3000L)
,Event("marry","./portId=1",4000L)
, Event("marry","./portId=1",6000L)
,Event("marry","./portId=1",5000L)
)
//reduce规约聚合操作
steam.map(data=>(data.user,1L))
.keyBy(_._1)
.reduce(new Mysum) //统计每个用户活跃度
.keyBy(data=>true) //将所有数据按照同样的key分到同一个组中
.reduce((state,data) => if (state._2>data._2) state else data) //选取当前最活跃的用户
.print()
env.execute()
}
class Mysum() extends ReduceFunction[(String,Long)]{
override def reduce(t: (String, Long), t1: (String, Long)): (String, Long) = (t._1,t._2+1)
}
}
代码运行结果
由此可见 marry的点击量最多文章来源:https://www.toymoban.com/news/detail-815477.html
reduce()同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以
我们需要将reduce()算子作用在一个有限key的流上。文章来源地址https://www.toymoban.com/news/detail-815477.html
到了这里,关于Flink核心API之DataStream(基础常用算子)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!