一文弄明白KeyedProcessFunction函数

这篇具有很好参考价值的文章主要介绍了一文弄明白KeyedProcessFunction函数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

引言

KeyedProcessFunction是Flink用于处理KeyedStream的数据集合,它比ProcessFunction拥有更多特性,例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧

正文

了解一个函数怎么用最权威的地方就是 官方文档 以及注解,KeyedProcessFunction的注解如下

/**
 * A keyed function that processes elements of a stream.
 *
 * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} is
 * invoked. This can produce zero or more elements as output. Implementations can also query the
 * time and set timers through the provided {@link Context}. For firing timers {@link #onTimer(long,
 * OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as
 * output and register further timers.
 *
 * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
 * available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.
 *
 * <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a {@link
 * org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link
 * org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown
 * methods can be implemented. See {@link
 * org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
 * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
 */

上面简单来说就是以下四点

  1. Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
  2. 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
  3. 由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
  4. 需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面

processElement方法解析

  1. Flink会调用processElement方法处理输入流中的每一条数据
  2. KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state
  3. 这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据

onTimer方法解析:在启用TimerService服务时会定时触发此方法,一般会在processElement方法中开启TimerService服务

以上就是这个函数的基本知识,接下来就通过实战来熟悉下它的使用

实战简介

本次实战的目标是学习KeyedProcessFunction,内容如下:

  1. 监听本机7777端口读取字符串
  2. 将每个字符串用空格分隔,转成Tuple2实例,f0是分隔后的单词,f1等于1
  3. 将Tuple2实例集合通过f0字段分区,得到KeyedStream
  4. KeyedSteam通过自定义KeyedProcessFunction处理
  5. 自定义KeyedProcessFunction的作用,是记录每个单词最新一次出现的时间,然后建一个十秒的定时器进行触发

使用代码例子

首先定义pojo类

public class CountWithTimestampNew {

    private String key;

    private long count;

    private long lastQuestTimestamp;

    public long getAndIncrementCount() {
        return ++count;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public long getLastQuestTimestamp() {
        return lastQuestTimestamp;
    }

    public void setLastQuestTimestamp(long lastQuestTimestamp) {
        this.lastQuestTimestamp = lastQuestTimestamp;
    }
}

接着实现KeyedProcessFunction类

public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {

    private ValueState<CountWithTimestampNew> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<CountWithTimestampNew>("sherlock-state", CountWithTimestampNew.class));
    }

    // 实现数据处理逻辑的地方
    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        Tuple currentKey = ctx.getCurrentKey();

        CountWithTimestampNew countWithTimestampNew = state.value();
        if (countWithTimestampNew == null) {
            countWithTimestampNew = new CountWithTimestampNew();
            countWithTimestampNew.setKey(value.f0);
        }

        countWithTimestampNew.getAndIncrementCount();

        //更新这个单词最后一次出现的时间
        countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());

        //单词之间不会互相覆盖吗?推测state对象是跟key绑定,针对每一个不同的key KeyedProcessFunction会创建其对应的state对象
        state.update(countWithTimestampNew);

        //给当前单词创建定时器,十秒后触发
        long timer = countWithTimestampNew.getLastQuestTimestamp()+10000;

        //尝试注释掉看看是否还会触发onTimer方法
        ctx.timerService().registerProcessingTimeTimer(timer);

        //打印所有信息,用于确保数据准确性
        System.out.println(String.format(" 触发processElement方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s",
                    currentKey.getField(0),
                    countWithTimestampNew.getCount(),
                    time(countWithTimestampNew.getLastQuestTimestamp()),
                    time(timer)
                ));
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {

        Tuple currentKey = ctx.getCurrentKey();

        CountWithTimestampNew countWithTimestampNew = state.value();

        //标记当前元素是否已经连续10s未出现
        boolean isTimeout = false;

        if (timestamp >= countWithTimestampNew.getLastQuestTimestamp()+10000 ) {
            //out.collect(new Tuple2<>(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));
            isTimeout = true;
        }

        //打印所有信息,用于确保数据准确性
        System.out.println(String.format(" 触发onTimer方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s",
                currentKey.getField(0),
                countWithTimestampNew.getCount(),
                time(countWithTimestampNew.getLastQuestTimestamp()),
                time(timestamp),
                String.valueOf(isTimeout)
        ));
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
    }
}

最后是启动类

public class KeyedProcessFunctionDemo2 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度1
        env.setParallelism(1);

        // 处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 监听本地9999端口,读取字符串
        DataStream<String> socketDataStream = env.socketTextStream("localhost", 7777);

        // 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到
        DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream
                // 对收到的字符串用空格做分割,得到多个单词
                .flatMap(new SplitterFlatMapFunction())
                // 设置时间戳分配器,用当前时间作为时间戳
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {

                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                        // 使用当前系统时间作为时间戳
                        return System.currentTimeMillis();
                    }

                    @Override
                    public Watermark getCurrentWatermark() {
                        // 本例不需要watermark,返回null
                        return null;
                    }
                })
                // 将单词作为key分区
                .keyBy(0)
                // 按单词分区后的数据,交给自定义KeyedProcessFunction处理
                .process(new CountWithTimeoutKeyProcessFunctionNew());

        // 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来
        timeOutWord.print();

        env.execute("ProcessFunction demo : KeyedProcessFunction");
    }
}

演示

在启动服务前,先通过linux指令监听端口 nc -lk 7777

  1. 启动Flink服务后,往7777端口里面发送数据
    一文弄明白KeyedProcessFunction函数,Flink,Flink算子

  2. 通过IDEA的终端可以看到有日志输出,可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志
    一文弄明白KeyedProcessFunction函数,Flink,Flink算子

  3. 那么咱们尝试连续发送两条Hello呢,可以看到累加器会持续累加,并且会触发两次onTimer方法,也就是每一条消息都会触发一次。由于连续发送两条,因此可以看得到第三行日志的末尾是false,说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来
    一文弄明白KeyedProcessFunction函数,Flink,Flink算子

  4. 再输入点其他的试试
    一文弄明白KeyedProcessFunction函数,Flink,Flink算子

  5. 通过输出可以看到这些单词的计数器又从0开始,说明每一个Key都对应一个状态
    一文弄明白KeyedProcessFunction函数,Flink,Flink算子

思考题文章来源地址https://www.toymoban.com/news/detail-836129.html

  1. open方法会在哪里进行调用,KeyedProcessFunction整个类的完整调用逻辑是怎么样的
  2. registerProcessingTimeTimer和registerEventTimeTimer的差异是什么

参考资料

  1. https://blog.csdn.net/boling_cavalry/article/details/106299167
  2. https://blog.csdn.net/lujisen/article/details/105510532
  3. https://blog.csdn.net/qq_31866793/article/details/102831731

到了这里,关于一文弄明白KeyedProcessFunction函数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 通过5个概念 一文弄明白DAO

    深入研究“什么是DAO ”的问题,并提出5个不同的概念视角,使我们能够更有效的用于对现有组织机构的协调和研究。  摘要 这5个不同的概念分别: 作为组织集体的DAO DAO作为一种多代理系统,具有相互关联的决策实例,其中代理将决策归为一个集体实体(DAO)),用来声明、

    2024年02月03日
    浏览(35)
  • 一文搞明白STM32芯片存储结构

            本篇介绍STM32芯片的存储结构,ARM公司负责提供设计内核,而其他外设则为芯片商设计并使用,ARM收取其专利费用而不参与其他经济活动,半导体芯片厂商拿到内核授权后,根据产品需求,添加各类组件,生产芯片售卖。图1为STM32的组成示意图,其中Cortex-M3内核、

    2024年02月14日
    浏览(38)
  • Flink源算子、转换算子和输出算子(DataSet)

    Flink是一种一站式处理的框架,既可以进行批处理(DataSet),也可以进行流处理(DataStream) 将Flink的算子分为两大类:DataSet 和 DataStream 1.1 fromCollection 从本地集合读取数据 1.2 readTextFile 从文件中读取 1.3 readTextFile 遍历目录 对一个文件目录内的所有文件,包括所有子目录中的

    2024年04月23日
    浏览(37)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(2)转换算子(Transformation)【基本转换算子、聚合算子】

    数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。 map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个 “一 一映射”,消费一个元素就产出一个元素 。 我们只

    2024年01月23日
    浏览(48)
  • [flink 实时流基础]源算子和转换算子

    Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。 在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方

    2024年04月11日
    浏览(43)
  • 【明解STM32】一文搞明白STM32芯片存储结构

    目录 一、前言 二、内核存储结构 三、芯片存储映射 四、总结         本篇介绍STM32芯片的存储结构,ARM公司负责提供设计内核,而其他外设则为芯片商设计并使用,ARM收取其专利费用而不参与其他经济活动,半导体芯片厂商拿到内核授权后,根据产品需求,添加各类组

    2024年02月16日
    浏览(39)
  • Spring-WebFlux使用,一文带你从0开始学明白Spring-WebFlux,学明白响应式编程

    传统的基于Servlet的Web框架,如Spring MVC,在本质上都是阻塞和多线程的,每个连接都会使用一个线程。在请求处理的时候,会在线程池中拉取一个工作者( worker )线程来对请求进行处理。同时,请求线程是阻塞的,直到工作者线程提示它已经完成为止。 在Spring5中,引入了一个新

    2024年02月03日
    浏览(39)
  • 【玩转Linux操作】一文带你明白Shell的判断,循环语句

    🎊专栏【玩转Linux操作】 🍔喜欢的诗句:更喜岷山千里雪 三军过后尽开颜。 🎆音乐分享【如愿】 大一同学小吉,欢迎并且感谢大家指出我的问题🥰 注意写空格 if … then 形式 类似于C/C++里面的 if-else 语句 🎈示例 🎈示例 🎈示例 类似于C/C++的 switch 语句 🎈示例 注意写空

    2024年02月13日
    浏览(39)
  • Flink之窗口聚合算子

    1.窗口聚合算子 在Flink中窗口聚合算子主要分类两类 滚动聚合算子(增量聚合) 全窗口聚合算子(全量聚合) 1.1 滚动聚合算子 滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下: aggregate max maxBy

    2024年02月07日
    浏览(43)
  • Flink基础概念-算子

    Flink的核心目标,是\\\"数据流上的有状态计算\\\"。 具体说明:ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。 无界数据流 例如从Kafka这样的消息组件中读取的数据一般,没有数据流结束的定义,即使没有数据也在进行消费。 有界数据流 有界数据

    2024年02月03日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包