Flink学习-处理函数

这篇具有很好参考价值的文章主要介绍了Flink学习-处理函数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

处理函数是Flink底层的函数,工作中通常用来做一些更复杂的业务处理,处理函数分好几种,主要包括基本处理函数,keyed处理函数,window处理函数。

Flink提供了8种不同处理函数:

  • ProcessFunction:dataStream
  • KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理
  • CoProcessFunction:用于connect连接的流
  • ProcessJoinFunction:用于join流操作
  • BroadcastProcessFunction:用于广播
  • KeyedBroadcastProcessFunction:keyBy之后的广播
  • ProcessWindowFunction:窗口增量聚合
  • ProcessAllWindowFunction:全窗口聚合
@Public
public interface RichFunction extends Function {
    void open(Configuration var1) throws Exception;

    void close() throws Exception;

    RuntimeContext getRuntimeContext();

    IterationRuntimeContext getIterationRuntimeContext();

    void setRuntimeContext(RuntimeContext var1);
}

基本处理函数(ProcessFunction)

处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。它所对应的函数 类,就叫作 ProcessFunction

处理函数的功能和使用

在很多应 用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。就必须使用处理函数进行实现。

处理函数提供了一个“定时服务” (TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线 (watermark),甚至可以注册“定时事件”。处理函数继承了 AbstractRichFunction 抽象类, 所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函 数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方 法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。

处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()方法 就可以了。ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunction; 所以所有的处理函数,都是富函数(RichFunction), 富函数可以调用的东西这里同样都可以调用。

PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
    private static final long serialVersionUID = 1L;

    public ProcessFunction() {
    }

    public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;

    public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
    }

    //
    public abstract class OnTimerContext extends ProcessFunction<I, O>.Context {
        public OnTimerContext() {
            super();
        }

        public abstract TimeDomain timeDomain();
    }
	//
    public abstract class Context {
        public Context() {
        }

        public abstract Long timestamp();

        public abstract TimerService timerService();

        public abstract <X> void output(OutputTag<X> var1, X var2);
    }
}

processElement会针对流中的每条记录调用一次。跟MapFunction一样,Collector发送出去。

Context可以访问时间戳,当前记录键值以及TimeService,支持将结果发送到副输出。

onTimer() 是一个回调函数,会在之前注册的计数器触发时调用。timestamp 参数给出了所触发计时器的时间戳,Collector可用来发出记录。

OnTimerContext能够提供和processElement()方法中Context对象相同的服务,它还会返回触发计时器的时间域(处理时间/事件时间)。

时间服务和计时器

ContextOnTimerContext对象中TimerService 提供时间相关的数据访问。

PublicEvolving
public interface TimerService {

    /** Error string for {@link UnsupportedOperationException} on registering timers. */
    String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";

    /** Error string for {@link UnsupportedOperationException} on deleting timers. */
    String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";

    /** 返回当前的处理时间。 */
    long currentProcessingTime();

    /** 返回当前水位线时间戳。 */
    long currentWatermark();

    /**
    针对当前键值注册一个处理时间计时器,当执行机器处理时间达到给定的时间戳,该计时器就会触发。
     */
    void registerProcessingTimeTimer(long time);

    /**
     * 针对当前键值注册一个事件时间计时器,当更新后水位线时间戳大于或等于计时器时间戳时,它就会触发。
     */
    void registerEventTimeTimer(long time);

    /**
     * 针对当前键值删除一个注册过的处理时间计时器。如果该计时器不存在,则方法不会有任何作用。
     */
    void deleteProcessingTimeTimer(long time);

    /**
     * 针对当前键值删除一个注册过事件时间计时器,如果该计时器不存在,则方法不会有任何作用。
     */
    void deleteEventTimeTimer(long time);
}

计时器触发时会调用onTimer()回调函数,系统对于processElement()onTimer()两个方法调用同步,防止并发。

每个键值和时间戳只能注册一个计时器,每个键值可以有多个计时器,但具体到每个时间戳就只能有一个。

副输出/侧输出(SideOutput)

大多数DataStream API 算子都只有一个输出,即只能生成一条某个数据类型的结果流。只有split算子可以将一条流拆分成多条类型相同的流。

处理函数提供的副输出功能允许从同一函数发出多条数据流,它们类型可以不同。

按键分区处理函数(KeyedProcessFunction)

按键分区处理函数是重点,用在keyby后面,对keyedStream进行处理,keyby将会按照Key进行分区,然后将不同key的数据分配到不同并行子任务上进行执行。

PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

    public abstract class Context {
        public abstract Long timestamp();
        public abstract TimerService timerService();
        public abstract <X> void output(OutputTag<X> outputTag, X value);

        /** Get key of the element being processed. */
        public abstract K getCurrentKey();
    }

    public abstract class OnTimerContext extends Context {
        /** The {@link TimeDomain} of the firing timer. */
        public abstract TimeDomain timeDomain();

        /** Get key of the firing timer. */
        @Override
        public abstract K getCurrentKey();
    }
}

窗口处理函数(ProcessWindowsFunction)

除了上面的按键分区处理函数之外,对于窗口也有函数,分两种,一种是窗口处理函数(ProcessWindowsFunction),另一种是全窗口处理函数(ProcessAllWindowsFunction),ProcessWindowFunction获得一个包含窗口所有元素的可迭代器以及一个具有时间和状态信息访问权的上下文对象,使得它比其他窗口函数提供更大的灵活性。是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止。

ProcessWindowsFunction:处理分区数据,每个窗口执行一次process方法.

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
        extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;
    
    public abstract void process(
            KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    public void clear(Context context) throws Exception {}

    /** The context holding window metadata. */
    public abstract class Context implements java.io.Serializable {
        /** Returns the window that is being evaluated. */
        public abstract W window();
        public abstract long currentProcessingTime();
        public abstract long currentWatermark();
        public abstract KeyedStateStore windowState();
        public abstract KeyedStateStore globalState();
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}

全窗口处理函数(ProcessAllWindowFunction)

ProcessAllWindowFunctionProcessFunction类相似,都是用来对上游过来的元素做处理,不过ProcessFunction是每个元素执行一次processElement方法,ProcessAllWindowFunction是每个窗口执行一次process方法(方法内可以遍历该窗口内的所有元素);

public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window>
        extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;
    public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out)
            throws Exception;

    public void clear(Context context) throws Exception {}

    /** The context holding window metadata. */
    public abstract class Context {
        public abstract W window();

        public abstract KeyedStateStore windowState();

        public abstract KeyedStateStore globalState();
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}

合并流处理函数(CoProcessFunction)

对于连接流ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)。与 CoMapFunction 类似,如果是调用.flatMap()就需要传入一个 CoFlatMapFunction,需要实现 flatMap1()flatMap2()两个方法;而调用.process()时,传入的则是一个 CoProcessFunction

public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;
    public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out)
            throws Exception;
    public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out)
            throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}


    public abstract class Context {

        public abstract Long timestamp();

        /** A {@link TimerService} for querying time and registering timers. */
        public abstract TimerService timerService();
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
    public abstract class OnTimerContext extends Context {
        public abstract TimeDomain timeDomain();
    }
}

连接流处理函数(ProcessJoinFunction)

ProcessJoinFunctionCoProcessFunction类似,但是有区别。

ProcessJoinFunction 用于join流操作,可以拿到两个流数据处理

CoProcessFunction 用于连接流处理,两个流数据分别处理

public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {

    private static final long serialVersionUID = -2444626938039012398L;
    public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out)
            throws Exception;
    public abstract class Context {
        public abstract long getLeftTimestamp();
        public abstract long getRightTimestamp();
        public abstract long getTimestamp();
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}

广播流处理函数(BroadcastProcessFunction)

广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流” BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    private static final long serialVersionUID = 8352559162119034453L;
  
    public abstract void processElement(
            final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;

 
    public abstract void processBroadcastElement(
            final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;

 
    public abstract class Context extends BaseBroadcastProcessFunction.Context {}
 
    public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {}
}

按键分区的广播连接流处理函数(KeyedBroadcastProcessFunction)

按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流, 是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。

附录

参考

Flink官方文档文章来源地址https://www.toymoban.com/news/detail-778243.html

到了这里,关于Flink学习-处理函数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink处理函数(2)—— 按键分区处理函数

     按键分区处理函数(KeyedProcessFunction):先进行分区,然后定义处理操作 定时器(timers)是处理函数中进行时间相关操作的主要机制 定时服务(TimerService)提供了注册定时器的功能 TimerService 是 Flink 关于时间和定时器的基础服务接口: 六个方法可以分成两大类:基于处理时

    2024年01月21日
    浏览(43)
  • Flink之常用处理函数

    处理函数(Processing Function)是Apache Flink中用于对数据流上的元素进行处理的核心组件之一。处理函数负责定义数据流上的数据如何被处理,允许开发人员编写自定义逻辑以执行各种操作,如转换、聚合、筛选、连接等,并在处理后生成输出数据流。 对于数据流,都可以直接

    2024年02月07日
    浏览(35)
  • Flink处理函数(一)

    目录  7.1 基本处理函数(ProcessFunction) 7.1.1 处理函数的功能和使用 7.1.2 ProcessFunction 解析 7.1.3 处理函数的分类 7.2 按键分区处理函数(KeyedProcessFunction) 7.2.1 定时器(Timer)和定时服务(TimerService) 7.2.2 KeyedProcessFunction 的使用 7.3 窗口处理函数 7.3.1 窗口处理函数的使用 7.3.

    2024年02月03日
    浏览(35)
  • flink处理函数--副输出功能

    在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出 本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出

    2024年02月07日
    浏览(43)
  • Flink处理函数解析(ProcessFunction和KeyedProcessFunction)

    Flink中的处理函数(ProcessFunction和KeyedProcessFunction)在对于数据进行颗粒化的精确计算时使用较多,处理函数提供了一个定时服务(TimerService),可以向未来注册一个定时服务,我们可以把它理解为一个闹钟,当闹钟响起时,就调用ProcessFunction中的onTimer()方法,会对数据进行一

    2024年02月04日
    浏览(35)
  • Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记

    学习文档:概念透析 - 及时流处理 学习笔记如下: 及时流处理时有状态流处理的扩展,其中时间在计算中起着一定的作用。 及时流的应用场景: 时间序列分析 基于特定时间段进行聚合 对发生时间很重要的事件进行处理 处理时间(processing time) 处理时间的即数据到达各个

    2024年02月03日
    浏览(50)
  • 大数据Flink(五十三):Flink流处理特性、发展历史以及Flink的优势

    文章目录 Flink流处理特性、发展历史以及Flink的优势 一、Flink流处理特性 二、发展历史

    2024年02月14日
    浏览(54)
  • 初探Flink的Java实现流处理和批处理

    端午假期,夏日炎炎,温度连续40度以上,在家学习Flink相关知识,记录下来,方便备查。 开发工具 :IntelliJ Idea Flink版本 :1.13.0 本次主要用Flink实现 批处理 (DataSet API) 和 流处理 (DataStream API)简单实现。 第一步、创建项目与添加依赖 1)新建项目 打开Idea,新建Maven项目

    2024年02月10日
    浏览(49)
  • Flink:处理大规模复杂数据集的最佳实践深入探究Flink的数据处理和性能优化技术

    作者:禅与计算机程序设计艺术 随着互联网、移动互联网、物联网等新型网络技术的不断发展,企业对海量数据的处理日益依赖,而大数据分析、决策支持、风险控制等领域都需要海量的数据处理能力。如何高效、快速地处理海量数据、提升处理效率、降低成本,是当下处理

    2024年02月13日
    浏览(56)
  • 大数据Flink(六十一):Flink流处理程序流程和项目准备

    文章目录 Flink流处理程序流程和项目准备 一、Flink流处理程序的一般流程

    2024年02月11日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包