Flink核心API之DataStream(基础常用算子)

这篇具有很好参考价值的文章主要介绍了Flink核心API之DataStream(基础常用算子)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink核心API之DataStream(基础常用算子)

(一)Flink核心API
Flink中提供了4种不同层次的API,每种API在简洁和易表达之间有自己的权衡,适用于不同的场景。目前上面3个会用得比较多。

低级API(Stateful Stream Processing):提供了对时间和状态的细粒度控制,简洁性和易用性较差,
主要应用在一些复杂事件处理逻辑上。
核心API(DataStream/DataSet API):主要提供了针对流数据和批数据的处理,是对低级API进行了一
些封装,提供了filter、sum、max、min等高级函数,简单易用,所以这些API在工作中应用还是比
较广泛的。
Table API:一般与DataSet或者DataStream紧密关联,可以通过一个DataSet或DataStream创建出一个Table,然后再使用类似于filter, join,或者 select这种操作。最后还可以将一个Table对象转成DataSet或DataStream。
SQL:Flink的SQL底层是基于Apache Calcite,Apache Calcite实现了标准的SQL,使用起来比其他API更加灵活,因为可以直接使用SQL语句。Table API和SQL可以很容易地结合在一块使用,因为它们都返回Table对象。
Flink核心API之DataStream(基础常用算子),flink,大数据

本文讲述DataStream一些基础API

DataStream API主要分为3块:DataSource、Transformation、DataSink。
DataSource是程序的输入数据源。
Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap和filter等操作。
DataSink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中

(1)DataSoure

首先我们可以用 fromElements方法来自己创建一些数据

import org.apache.flink.streaming.api.scala._

object sourceBoundedTest {
  def main(args: Array[String]): Unit = {
    val env =StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //    1.从元素中读取数据
    val stream:DataStream[Int] = env.fromElements(1, 2, 3, 4, 5)

(2)Transformation

transformation是Flink程序的计算算子,负责对数据进行处理,Flink提供了大量的算子,其实Flink中的大部分算子的使用和spark中算子的使用是一样的,下面我们来看一下一些基础比较常用的算子:

算子 解释
map 输入一个元素进行处理,返回一个元素
flatMap 输入一个元素进行处理,可以返回多个元素
filter 对数据进行过滤,符合条件的数据会被留下
keyBy 根据key分组,相同key的数据会进入同一个分区
reduce 对当前元素和上一次的结果进行聚合操作

【1】map算子

首先创建一个样例类
case class Event(user:String,url:String,timestamp:Long)

咱们用map打印出user的内容
代码展示为:

object TransFormationTest {
  def main(args: Array[String]): Unit = {
  val env=  StreamExecutionEnvironment.getExecutionEnvironment
      env.setParallelism(1)

    val stream:DataStream[Event] = env.fromElements(Event("Mary","./home",1000L),Event("Bob","./cart",2000L))


    stream.map(_.user).print("1")

    env.execute()

打印结果为

Flink核心API之DataStream(基础常用算子),flink,大数据

[2].过滤(filter)

filter()转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤
条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉
Flink核心API之DataStream(基础常用算子),flink,大数据
进行filter()转换之后的新数据流的数据类型与原数据流是相同的。filter()转换需要传入的
参数需要实现FilterFunction 接口,而 FilterFunction 内要实现 filter()方法,就相当于一个返回
布尔类型的条件表达式。
下面的代码会将数据流中用户Mary和Bob的浏览行为过滤出来

package DataStreamApi

import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
//import org.apache.spark.api.java.function.FilterFunction
import org.apache.flink.api.common.functions._
object TransformFliter {
  def main(args: Array[String]): Unit = {
    val env =StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream:DataStream[Event] = env.fromElements(Event("Mary","./home",1000L),Event("Bob","./cart",2000L))

//1.使用匿名函数
    stream.filter(_.user=="Mary").print("1")
//    2.实现filterFunction接口
stream.filter(new UserFilter).print("2")
  env.execute()
  }
  class UserFilter extends FilterFunction[Event]{
    override def filter(t:Event):Boolean =t.user=="Bob"
  }
}

运行结果
Flink核心API之DataStream(基础常用算子),flink,大数据

【3】扁平映射(flatMap)

flatMap()操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个
一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap()可以认为是“扁平化”
(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,
再对拆分后的元素做转换处理,如图5-7所示。我们此前WordCount程序的第一步分词操作,
就用到了flatMap()。

Flink核心API之DataStream(基础常用算子),flink,大数据
同map()一样,flatMap()也可以使用Lambda表达式或者FlatMapFunction接口实现类的方
式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流的数据类型相同,也
可以不同。
flatMap()的使用非常灵活,可以对结果进行任意输出,下面就是一个例子:

package DataStreamApi

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util

import java.util.stream.Collector

object TransFormFlatMap {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream:DataStream[Event]=env.fromElements(Event("Mary","./home",1000L),Event("Kevin","./root",2000L))

    stream.flatMap(new myFLatMap).print()

env.execute()
  }
  //自定义实现flatMap接口
  class myFLatMap extends FlatMapFunction[Event,String]{
    override def flatMap(t: Event, collector: util.Collector[String]): Unit = {
//    如果当年是marry点击事件,那么直接输出user
      if(t.user=="Mary"){
        collector.collect(t.user)
      }
//        如果是Kevin的点击事件
      else if (t.user=="Kevin"){
        collector.collect(t.user)
        collector.collect(t.url)
      }
    }
    }
  }


打印结果为
Flink核心API之DataStream(基础常用算子),flink,大数据

聚合算子(Aggregation)

1.按键分区(keyBy)

对于Flink而言,DataStream 是没有直接进行聚合的API的。因为我们对海量数据做聚合
肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;
这个操作就是通过keyBy()来完成的。
keyBy()是聚合前必须要用到的一个算子。keyBy()通过指定键(key),可以将一条流从逻
辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对
应着任务槽(task slots)。
基于不同的key,流中的数据将被分配到不同的分区中去,如图5-8所示;这样一来,所
有具有相同的key的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个slot
中进行处理了。
Flink核心API之DataStream(基础常用算子),flink,大数据

import org.apache.flink.streaming.api.scala._ 
object TransKeyBy { 
def main(args: Array[String]): Unit = { 
val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.setParallelism(1) 
val stream = env 
.fromElements( 
Event("Mary", "./home", 1000L), 
Event("Bob", "./cart", 2000L) 
) 
//指定Event的user属性作为key 
val keyedStream = stream.keyBy(_.user) 
keyedStream.print() 
env.execute() 
} 
}

2. 简单聚合

有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们
内置实现了一些最基本、最简单的聚合API,主要有以下几种:
⚫ sum():在输入流上,对指定的字段做叠加求和的操作。
⚫ min():在输入流上,对指定的字段求最小值。
⚫ max():在输入流上,对指定的字段求最大值。
⚫ minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计
算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包
含字段最小值的整条数据。
⚫ maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与
min()/minBy()完全一致。
简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;
但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字
段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字
段的名称,是以_1、_2、_3、…来命名的。
例如,下面就是对元组数据流进行聚合的测试:

import org.apache.flink.streaming.api.scala._ 
 
object TransTupleAggregation { 
  def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setParallelism(1) 
 
    val stream = env 
      .fromElements( 
        ("a", 1), ("a", 3), ("b", 3), ("b", 4) 
      ) 
 
    stream.keyBy(_._1).sum(1).print() //对元组的索引1位置数据求和 
    stream.keyBy(_._1).sum("_2").print() //对元组的第2个位置数据求和 
    stream.keyBy(_._1).max(1).print() //对元组的索引1位置求最大值 
    stream.keyBy(_._1).max("_2").print() //对元组的第2个位置数据求最大值 
    stream.keyBy(_._1).min(1).print() //对元组的索引1位置求最小值 
    stream.keyBy(_._1).min("_2").print() //对元组的第2个位置数据求最小值 
    stream.keyBy(_._1).maxBy(1).print() //对元组的索引1位置求最大值 
    stream.keyBy(_._1).maxBy("_2").print() //对元组的第2个位置数据求最大值 
    stream.keyBy(_._1).minBy(1).print() //对元组的索引1位置求最小值 
    stream.keyBy(_._1).minBy("_2").print() //对元组的第2个位置数据求最小值 
 
    env.execute() 
  } 
} 
而如果数据流的类型是样例类,那么就只能通过字段名称来指定,不能通过位置来指定了。 
package com.atguigu.chapter05 
 
import org.apache.flink.streaming.api.scala._ 
 
object TransAggregationCaseClass { 
  def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setParallelism(1) 
 
    val stream = env.fromElements( 
      Event("Mary", "./home", 1000L), 
      Event("Bob", "./cart", 2000L) 
    ) 
 // 使用user作为分组的字段,并计算最大的时间戳 
    stream.keyBy(_.user).max("timestamp").print() 
 
    env.execute() 
  }}

一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。
所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值
的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,
应该只用在含有有限个key的数据流上。

3. 归约聚合(reduce)

与简单聚合类似,reduce()操作也会将KeyedStream转换为DataStream。它不会改变流的
元素数据类型,所以输出类型和输入类型是一样的。
调用KeyedStream的reduce()方法时,需要传入一个参数,实现ReduceFunction接口。接
口在源码中的定义如下:
public interface ReduceFunction extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}
ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处
理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再
将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据,
这也就是reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果”
作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
下面我们来看一个稍复杂的例子。
我们将数据流按照用户id进行分区,然后用一个reduce()算子实现sum()的功能,统计每
个用户访问的频次;进而将所有统计结果分到一组,用另一个reduce()算子实现maxBy()的功
能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。

package DataStreamApi

import org.apache.flink.api.common.functions._
import org.apache.flink.streaming.api.scala._

object TransFormReduce {
  def main(args: Array[String]): Unit = {
    val env=StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
 val steam=env.fromElements(Event("marry","./portId=1",1000L)
 ,Event("Kevin","./portId=2",2000L)
   ,Event("Bob","./portId=3",3000L)
   ,Event("marry","./portId=1",4000L)
    , Event("marry","./portId=1",6000L)
   ,Event("marry","./portId=1",5000L)
 )
//reduce规约聚合操作
    steam.map(data=>(data.user,1L))
      .keyBy(_._1)
      .reduce(new Mysum) //统计每个用户活跃度
      .keyBy(data=>true) //将所有数据按照同样的key分到同一个组中
      .reduce((state,data) => if (state._2>data._2) state else data)  //选取当前最活跃的用户
      .print()
env.execute()
  }
  class Mysum() extends ReduceFunction[(String,Long)]{
    override def reduce(t: (String, Long), t1: (String, Long)): (String, Long) = (t._1,t._2+1)
  }
}

代码运行结果
Flink核心API之DataStream(基础常用算子),flink,大数据

由此可见 marry的点击量最多

reduce()同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以
我们需要将reduce()算子作用在一个有限key的流上。文章来源地址https://www.toymoban.com/news/detail-815477.html

到了这里,关于Flink核心API之DataStream(基础常用算子)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink-1.17-教程】-【四】Flink DataStream API(1)源算子(Source)

    DataStream API 是 Flink 的核心层 API。一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成: Flink 程序可以在各种上下文环境中运行:我们可以在本地 JVM 中执行程序,也可以提交到远程集群上运行。 不同的环境,代码的提交运行的过程会

    2024年01月22日
    浏览(55)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(5)转换算子(Transformation)【分流】

    所谓 “分流” ,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream ,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。 其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选

    2024年01月24日
    浏览(46)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

    用户自定义函数( user-defined function , UDF ),即用户可以根据自身需求,重新实现算子的逻辑。 用户自定义函数分为: 函数类 、 匿名函数 、 富函数类 。 Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction 、 FilterFunction 、 ReduceFunction 等。所

    2024年01月23日
    浏览(46)
  • Flink基础之DataStream API

    union联合:被unioin的流中的数据类型必须一致 connect连接:合并的两条流的数据类型可以不一致 connec后,得到的是ConnectedStreams 合并后需要根据数据流是否经过keyby分区 coConnect: 将两条数据流合并为同一数据类型 keyedConnect 目前所使用的大多数Sink, 都是基于2PC的方式来保证状态

    2024年02月05日
    浏览(39)
  • Flink 读写MySQL数据(DataStream和Table API)

    Flink提供了基于JDBC的方式,可以将读取到的数据写入到MySQL中;本文通过两种方式将数据下入到MySQL数据库,其他的基于JDBC的数据库类似,另外,Table API方式的Catalog指定为Hive Catalog方式,持久化DDL操作。 另外,JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将

    2023年04月09日
    浏览(43)
  • Flink从入门到放弃—Stream API—常用算子(map和flatMap)

    观看本文章之前,请先看之前文章 (十)Flink Datastream API 编程指南 算子-1 (转换算子、物理分区、任务链、资源组 、算子和作业)等基本介绍 常用算子基本使用 本次主要是阅读几个常用算子的源码。 map flatmap keyby filter reduce window windowAll 其它 首先说一下这个算子是one to

    2023年04月08日
    浏览(45)
  • Flink DataStream API CDC同步MySQL数据到StarRocks

    Flink:1.16.1 pom文件如下 Java代码 SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息 DataCenterShine实体类,字段与数据库一一对应。 StarRocksPrimary 实体类 FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。 TableName 注解类,

    2024年02月03日
    浏览(73)
  • Flink学习——DataStream API

            一个flink程序,其实就是对DataStream的各种转换。具体可以分成以下几个部分: 获取执行环境(Execution Environment) 读取数据源(Source) 定义基于数据的转换操作(Transformations) 定义计算结果的输出位置(Sink) 触发程序执行(Execute)         flink 程序可以在各种上

    2024年02月05日
    浏览(38)
  • Flink DataStream API详解

    参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html Data Sources Source是程序读取其输入的位置,您可以使用 env.addSource(sourceFunction) 将Source附加到程序中。Flink内置了许多预先实现的SourceFunction,但是您始终可以通过实现SourceFunction(non-parallel sources)来编写自定

    2024年02月14日
    浏览(39)
  • 【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流 是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。 有界数据流 是一个

    2024年02月06日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包