Flink的KeyedProcessFunction基于Event Time和Process Time的定时器用法实例分析

这篇具有很好参考价值的文章主要介绍了Flink的KeyedProcessFunction基于Event Time和Process Time的定时器用法实例分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

FLink处理函数简介

在Flink底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的【处理】(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作【处理函数】(process function)。在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。

Flink几种处理函数简介

  1. ProcessFunction是用于处理数据流的通用函数。它是一个抽象类,定义了处理数据流的常用方法,如processElement,onTimer等。您可以扩展ProcessFunction类并重写这些方法,以便在Flink程序中执行复杂的数据流处理逻辑。
  2. KeyedProcessFunction是ProcessFunction的特殊类型,用于处理带有键的数据流。它定义了额外的方法,如getKey,context.timerService()等,用于访问数据流中每个元素的键以及在处理函数中安排定时器。
  3. ProcessWindowFunction和ProcessAllWindowFunction是用于处理时间窗口的特殊函数。它们提供了一个process方法,用于在每个窗口中对数据进行处理。ProcessWindowFunction接受带有键的数据流,并且每个窗口都对应于一个键,而ProcessAllWindowFunction接受不带键的数据流,并且每个窗口都包含整个数据流。

这里重点介绍KeyedProcessFunction,KeyedProcessFunction是用来处理KeyedStream的。每有一个数据进入算子,则会触发一次processElement()的处理。它还提供了定时器的功能,在在预警、监控等场景特定场景下,非常适合。
KeyedProcessFunction定时器包分为两种:基于事件时间、基于处理时间。下面以统计计数的方式展示这两种定时器的用法,并附上详细的分析思路。以下用例基于Flink1.14

实例分析

KeyedProcessFunction基于事件时间的定时器

代码:


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;

/**
 * @description:
 *
 * @author pony
 * @date 2024/1/17 20:55
 * @version 1.0
 * nc -l 9999
 */
public class KeyedProcessFunctionOnTimerEventTime {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        return Long.valueOf(element.split(",")[1]);
                    }
                })
                .withIdleness(Duration.ofSeconds(1));

        DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999)
                .assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));
                    }
                });

        // apply the process function onto a keyed stream
        DataStream<Tuple2<String, Long>> result = stream0
                .keyBy(value -> value.f0)
                .process(new CountEventTimeWithTimeoutFunction());

        result.print();

        env.execute("KeyedProcessFunction wordCount");
    }

    /**
     * The implementation of the ProcessFunction that maintains the count and timeouts
     */
    static class CountEventTimeWithTimeoutFunction
            extends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {

        private ValueState<Long> state;
        private static final Integer DELAY = 1000; //1s

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
        }

        @Override
        public void processElement(
                Tuple2<String, Long> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            Long current = state.value();
            if (current == null) {
                current = 0L;
            }
            current++;
            state.update(current);
            //获取当前数据流的水位线
            long currentWatermark = ctx.timerService().currentWatermark();

//            long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAY
            long timer = currentWatermark + DELAY;//设置定时器的时间为当前水位线+DELAY
            //注册事件时间定时器,与watermark绑定,必须满足条件: watermark >= timer 来触发特定event的定时器
            ctx.timerService().registerEventTimeTimer(timer);

            //删除事件时间定时器
            if (currentWatermark < 0) {
                ctx.timerService().deleteEventTimeTimer(timer);
            }

            System.out.println("last Watermark: " + currentWatermark + ", format: " + time(currentWatermark));

            // 打印信息,用于核对数据
            System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",
                    ctx.getCurrentKey(),
                    current,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timer,
                    time(timer)));

        }

        @Override
        public void onTimer(
                long timestamp, //定时器触发时间,等于以上的timer
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
            // 取得当前单词
            String currentKey = ctx.getCurrentKey();
            // get the state for the key that scheduled the timer
            Long result = state.value();

            // 打印数据,用于核对是否符合预期
            System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",
                    currentKey,
                    result,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timestamp,
                    time(timestamp)));
            System.out.println("current Watermark: " + ctx.timerService().currentWatermark() + ", format: " + time(ctx.timerService().currentWatermark()));
            
            out.collect(new Tuple2<String, Long>(currentKey, result));

        }

        @Override
        public void close() throws Exception {
            super.close();
            state.clear();
        }
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));
    }
}

测试数据:

nc -l 9999
a1,1704038400000
a1,1704038401000
a1,1704038403000

运行结果:
Flink的KeyedProcessFunction基于Event Time和Process Time的定时器用法实例分析,flink,flink,大数据

KeyedProcessFunction基于处理时间的定时器

代码:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;

/**
 * @description:
 *
 * @author pony
 * @date 2024/1/17 20:55
 * @version 1.0
 * nc -l 9999
 */
public class KeyedProcessFunctionOnTimerProcessTime {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
//                        return System.currentTimeMillis();
                        return Long.valueOf(element.split(",")[1]);
                    }
                })
                .withIdleness(Duration.ofSeconds(1));

        DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999)
                .assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));
                    }
                });

        // apply the process function onto a keyed stream
        DataStream<Tuple2<String, Long>> result = stream0
                .keyBy(value -> value.f0)
                .process(new CountProcessTimeWithTimeoutFunction());

        result.print();

        env.execute("KeyedProcessFunction wordCount");
    }

    static class CountProcessTimeWithTimeoutFunction
            extends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {

        private ValueState<Long> state;
        private static final Integer DELAY = 60 * 1000; //1s

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
        }

        @Override
        public void processElement(
                Tuple2<String, Long> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            Long current = state.value();
            if (current == null) {
                current = 0L;
            }
            current++;
            state.update(current);

            long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAY
            //注册处理时间定时器, 与watermark无关,定时器触发条件:当前系统时间>timer
            ctx.timerService().registerProcessingTimeTimer(timer);
            //删除处理时间定时器
//            ctx.timerService().deleteProcessingTimeTimer(timer);

            System.out.println("processElement currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));
            // 打印所有信息,用于核对数据
            System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",
                    ctx.getCurrentKey(),
                    current,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timer,
                    time(timer)));
        }

        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
            // 取得当前单词
            String currentKey = ctx.getCurrentKey();
            // get the state for the key that scheduled the timer
            Long result = state.value();

            System.out.println("onTimer currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));
            // 打印数据,用于核对是否符合预期
            System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",
                    currentKey,
                    result,
                    ctx.timestamp(),
                    time(ctx.timestamp()),
                    timestamp,
                    time(timestamp)));

            //另外还支持侧流
            OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("single"){};
            if (result < 2) {
                ctx.output(outputTag, new Tuple2<>(currentKey, result));
            } else {
                out.collect(new Tuple2<String, Long>(currentKey, result));
            }

        }

        @Override
        public void close() throws Exception {
            super.close();
            state.clear();
        }
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));
    }
}

测试数据:

nc -l 9999
a,1705568024000    
a,1705568024000

运行结果:
Flink的KeyedProcessFunction基于Event Time和Process Time的定时器用法实例分析,flink,flink,大数据

总结

在真实业务场景中【 KeyedProcessFunction基于处理时间的定时器】用的比较多,比较符合业务场景,即根据事件的时间来指定处理时间去定时触发定时器。因此在此场景中,可以不指定watermarkStrategy,可以获取传输参数的时间时间来定时触发定时器。

参考:
Process Function
Generating Watermarks文章来源地址https://www.toymoban.com/news/detail-806306.html

到了这里,关于Flink的KeyedProcessFunction基于Event Time和Process Time的定时器用法实例分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月24日
    浏览(30)
  • Flink timer定时器

    常见timer 基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext 所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registerProcessingTimeTimer(timestamp: Lon

    2024年02月07日
    浏览(38)
  • flink中使用外部定时器实现定时刷新

    我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢? 1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,

    2024年02月06日
    浏览(34)
  • rtc定时器配置ioctl 设置 RTC_AIE_OFF、RTC_RD_TIME、RTC_ALM_SET、RTC_AIE_ON

    要设置 RTC 定时器以及相关标志,您需要使用 Linux 的 RTC 设备接口( /dev/rtc )。下面是一个示例代码,演示了如何使用 rtc 设备接口设置 rtc 定时器及相关标志: 这段代码打开了 /dev/rtc 设备文件,并使用 RTC_RD_TIME 命令获取当前 RTC 时间。然后,它设置了一个 RTC 定时器,使之

    2024年01月17日
    浏览(34)
  • 大数据-玩转数据-Flink定时器

    基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registerProcessingTimeTimer(timestamp: Long): Unit 会注

    2024年02月10日
    浏览(28)
  • [AIGC] 深入理解Flink中的窗口、水位线和定时器

    Apache Flink是一种流处理和批处理的混合引擎,它提供了一套丰富的APIs,以满足不同的数据处理需求。在本文中,我们主要讨论Flink中的三个核心机制:窗口(Windows)、水位线(Watermarks)和定时器(Timers)。 在流处理应用中,一种常见的需求是计算某个时间范围内的数据,这

    2024年03月27日
    浏览(46)
  • Why choose Flink for real-time processing

    Why choose Flink [1] Streaming data more truly reflects our lifestyle (real-time chat); [2] Traditional data architecture is based on limited data sets (Spark is based on micro-batch data processing); [3] Our goal: low latency, high throughput (distributed architecture, there may be confusion in the order, for example, within 1 hour of statistics, some data

    2024年03月20日
    浏览(40)
  • 一文弄明白KeyedProcessFunction函数

    KeyedProcessFunction是Flink用于处理KeyedStream的数据集合,它比ProcessFunction拥有更多特性,例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 了解一个函数怎么用最权威的地方就是 官方文档 以及注解,KeyedProcessFunction的注解如下 上面简单来说就是以下四点 Flink中输

    2024年02月22日
    浏览(35)
  • Flink(七)Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月15日
    浏览(37)
  • 7、Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月14日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包