Flink处理函数(2)—— 按键分区处理函数

这篇具有很好参考价值的文章主要介绍了Flink处理函数(2)—— 按键分区处理函数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 按键分区处理函数(KeyedProcessFunction):先进行分区,然后定义处理操作

1.定时器(Timer)和定时服务(TimerService)

  • 定时器(timers)是处理函数中进行时间相关操作的主要机制
  • 定时服务(TimerService)提供了注册定时器的功能

TimerService 是 Flink 关于时间和定时器的基础服务接口:

// 获取当前的处理时间
long currentProcessingTime();
// 获取当前的水位线(事件时间)
long currentWatermark();
// 注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);
// 注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);
// 删除触发时间为 time 的事件时间定时器
void deleteEventTimeTimer(long time);

六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器

尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间

对于处理时间和事件时间这两种类型的定时器,TimerService 内部会用一个优先队列将它们的时间戳保存起来,排队等待执行;可以认为,定时器其实是 KeyedStream上处理算子的一个状态,它以时间戳作为区分。所以 TimerService 会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

基于 KeyedStream 注册定时器时,会传入一个定时器触发的时间戳,这个时间戳的定时器对于每个 key 都是有效的;利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多 1 秒一次:

long coalescedTime = time / 1000 * 1000; //时间戳(定时器默认的区分精度是毫秒)
ctx.timerService().registerProcessingTimeTimer(coalescedTime); //注册定时器

2.KeyedProcessFunction 的使用

基础用法:

stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())

这里的MyKeyedProcessFunction即是KeyedProcessFunction的一个实现类;

源码解析


KeyedProcessFunction源码如下:

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;

    /**
     * Process one element from the input stream.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter and
     * also update internal state or set timers using the {@link Context} parameter.
     *
     * @param value The input value.
     * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
     *     {@link TimerService} for registering timers and querying the time. The context is only
     *     valid during the invocation of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    /**
     * Called when a timer set using {@link TimerService} fires.
     *
     * @param timestamp The timestamp of the firing timer.
     * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link
     *     TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for
     *     registering timers and querying the time. The context is only valid during the invocation
     *     of this method, do not store it.
     * @param out The collector for returning result values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

    /**
     * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
     * or {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class Context {

        /**
         * Timestamp of the element currently being processed or timestamp of a firing timer.
         *
         * <p>This might be {@code null}, for example if the time characteristic of your program is
         * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
         */
        public abstract Long timestamp();

        /** A {@link TimerService} for querying time and registering timers. */
        public abstract TimerService timerService();

        /**
         * Emits a record to the side output identified by the {@link OutputTag}.
         *
         * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
         * @param value The record to emit.
         */
        public abstract <X> void output(OutputTag<X> outputTag, X value);

        /** Get key of the element being processed. */
        public abstract K getCurrentKey();
    }

    /**
     * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class OnTimerContext extends Context {
        /** The {@link TimeDomain} of the firing timer. */
        public abstract TimeDomain timeDomain();

        /** Get key of the firing timer. */
        @Override
        public abstract K getCurrentKey();
    }
}

可以看到和ProcessFunction类似,都有一个processElement()onTimer()方法,并且定义了一个Context抽象类;不同点在于类型参数多了一个K,也就是key的类型;

代码示例

①处理时间语义

public class ProcessingTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 处理时间语义,不需要分配时间戳和watermark
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());

        // 要用定时器,必须基于KeyedStream
        stream.keyBy(data -> true)
                .process(new KeyedProcessFunction<Boolean, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        Long currTs = ctx.timerService().currentProcessingTime();
                        out.collect("数据到达,到达时间:" + new Timestamp(currTs));
                        // 注册一个10秒后的定时器
                        ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));
                    }
                })
                .print();

        env.execute();
    }
}

通过ctx.timerService().currentProcessingTime()获取当前处理时间;

通过ctx.timerService().registerProcessingTimeTimer来设置一个定时器;

运行结果如下:

Flink处理函数(2)—— 按键分区处理函数,大数据,Flink,flink,大数据

由于定时器是处理时间的定时器,不用考虑水位线延时问题,因此10s后能够准时触发定时操作;


②事件时间语义:

public class EventTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        // 基于KeyedStream定义事件时间定时器
        stream.keyBy(data -> true)
                .process(new KeyedProcessFunction<Boolean, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("数据到达,时间戳为:" + ctx.timestamp());
                        out.collect("数据到达,水位线为:" + ctx.timerService().currentWatermark() + "\n -------分割线-------");
                        // 注册一个10秒后的定时器
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定时器触发,触发时间:" + timestamp);
                    }
                })
                .print();

        env.execute();
    }

    // 自定义测试数据源
    public static class CustomSource implements SourceFunction<Event> {
        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            // 直接发出测试数据
            ctx.collect(new Event("Mary", "./home", 1000L));
            // 为了更加明显,中间停顿5秒钟
            Thread.sleep(5000L);

            // 发出10秒后的数据
            ctx.collect(new Event("Mary", "./home", 11000L));
            Thread.sleep(5000L);

            // 发出10秒+1ms后的数据
            ctx.collect(new Event("Alice", "./cart", 11001L));
            Thread.sleep(5000L);
        }

        @Override
        public void cancel() { }
    }
}

运行结果如下:

Flink处理函数(2)—— 按键分区处理函数,大数据,Flink,flink,大数据

运行结果解释:

①第一条数据到来时,时间戳为1000,但由于水位线的生成是周期性的(默认200ms),因此水位线不会立即发送改变,仍然是Long.MIN_VALUE,之后只要到了水位线生成的时间周期,就会依据当前最大的时间戳来生成水位线(默认减1)

②第二条数据到来时,显然水位线已经推进到了999,但仍然不会立即改变;

③在事件时间语义下,定时器触发的条件就是水位线推进到设定的时间;第一条数据到来之后,设定的定时器时间为11000,而当时间戳为11000的数据到来时,水位线还停留在999的位置,因此不会立即触发定时器;之后水位线会推进到10999(11000-1),同样无法触发定时器;

④第三条数据到来时,时间戳为11001,此时水位线推进到了10999,等到水位线周期性更新后,推进到11000(11001-1),这样第一个定时器就会触发

⑤然后等待5s后,没有新的数据到来,整个程序结束,将要退出,此时会将水位线推进到Long.MAX_VALUE,所以所有没有触发的定时器统一触发;

 学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili文章来源地址https://www.toymoban.com/news/detail-811163.html

到了这里,关于Flink处理函数(2)—— 按键分区处理函数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink处理函数(3)—— 窗口处理函数

    窗口处理函数包括:ProcessWindowFunction 和 ProcessAllWindowFunction 基础用法 这里的 MyProcessWindowFunction 就是 ProcessWindowFunction 的一个实现类; ProcessWindowFunction 是一个典型的全窗口函数,把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理 源码解析 类型参数如下: IN:i

    2024年01月20日
    浏览(43)
  • Flink 处理函数(1)—— 基本处理函数

    在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以 自定义处理逻辑 在处理函数中,我们直面的就是数据流中最基本的元素: 数据事件(event)、状态(state)以及时间(time) 。这就相当于 对流有了完全的控制权 基本处理函数主要是定

    2024年01月18日
    浏览(41)
  • 24、Flink 的table api与sql之Catalogs(java api操作分区与函数、表)-4

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月08日
    浏览(49)
  • Flink学习-处理函数

    处理函数是Flink底层的函数,工作中通常用来做一些更复杂的业务处理,处理函数分好几种,主要包括基本处理函数,keyed处理函数,window处理函数。 Flink提供了8种不同处理函数: ProcessFunction :dataStream KeyedProcessFunction :用于KeyedStream,keyBy之后的流处理 CoProcessFunction :用于

    2024年02月03日
    浏览(37)
  • Flink之常用处理函数

    处理函数(Processing Function)是Apache Flink中用于对数据流上的元素进行处理的核心组件之一。处理函数负责定义数据流上的数据如何被处理,允许开发人员编写自定义逻辑以执行各种操作,如转换、聚合、筛选、连接等,并在处理后生成输出数据流。 对于数据流,都可以直接

    2024年02月07日
    浏览(37)
  • Flink处理函数(一)

    目录  7.1 基本处理函数(ProcessFunction) 7.1.1 处理函数的功能和使用 7.1.2 ProcessFunction 解析 7.1.3 处理函数的分类 7.2 按键分区处理函数(KeyedProcessFunction) 7.2.1 定时器(Timer)和定时服务(TimerService) 7.2.2 KeyedProcessFunction 的使用 7.3 窗口处理函数 7.3.1 窗口处理函数的使用 7.3.

    2024年02月03日
    浏览(35)
  • flink处理函数--副输出功能

    在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出 本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出

    2024年02月07日
    浏览(44)
  • 《Flink学习笔记》——第七章 处理函数

    为了让代码有更强大的表现力和易用性,Flink 本身提供了多层 API 在更底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口

    2024年02月10日
    浏览(51)
  • Flink处理函数解析(ProcessFunction和KeyedProcessFunction)

    Flink中的处理函数(ProcessFunction和KeyedProcessFunction)在对于数据进行颗粒化的精确计算时使用较多,处理函数提供了一个定时服务(TimerService),可以向未来注册一个定时服务,我们可以把它理解为一个闹钟,当闹钟响起时,就调用ProcessFunction中的onTimer()方法,会对数据进行一

    2024年02月04日
    浏览(37)
  • Flink学习——处理函数ProcessFunction及多流转换

            在DataStream的更底层,我们可以不定义任何具体的算子(如map(),filter()等)二只提炼出一个统一的“处理”(process)操作 。它是所有转换算子的概括性的表达。可以自定义处理逻辑。         所以这一层接口就被叫做“ 处理函数 ”( process function )         处理

    2024年02月14日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包