二次开发Flink-coGroup算子支持迟到数据通过测输出流提取

这篇具有很好参考价值的文章主要介绍了二次开发Flink-coGroup算子支持迟到数据通过测输出流提取。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1.背景

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

2.2coGroup方法入口

2.3 CoGroupedStreams对象分析

2.4WithWindow内部类分析

2.5CoGroupWindowFunction函数分析

3.修改源码支持获取迟到数据测输出流

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

3.3新增WithWindow构造方法

3.4修改apply方法

3.5开放UnionTypeInfo类的public权限

​3.6编译Flink源码flink-streaming-java模块

3.7项目中查看maven是否已经刷新为最新代码

4.测试


1.背景

coGroup算子开窗到时间关闭之后,迟到数据无法通过测输出流提取,intervalJoin算子提供了api,因为join算子底层就是coGroup算子,所以Join算子也不行。

flink版本 v1.17.1

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

    input1.coGroup(input2)
    .where(keySelector1)
    .equalTo(keySelector2)
    .window(windowAssigner)
    .trigger(trigger)
    .evictor(evictor)
    .allowedLateness(allowedLateness)
    .apply(cgroupFunction)

通过上述代码可以看到没有sideOutputLateData的相关方法,用来提取窗口关闭之后的迟到数据

2.2coGroup方法入口

其中创建了一个CoGroupedStreams流对象

    /**
     * Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys and
     * window can be specified.
     */
    public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
        return new CoGroupedStreams<>(this, otherStream);
    }

2.3 CoGroupedStreams对象分析

他可以理解为构造设计模式的一个Builder类,通过where方法配置第一条流的KeySelector,再返回一个CoGroupedStreams的内部类Where,再通过equalTo方法配置第二条流的KeySelector,再返回EqualTo内部类,window方法配置窗口划分器,返回WithWindow内部类,后续都是窗口的配置 trigger,evictor,allowedLateness配置窗口参数,最后调用apply方法传送用户业务函数

2.4WithWindow内部类分析

WithWindow是最终保存所有配置的内部类包括两条流,窗口配置,key提取器的配置,最终会用户调用apply方法触发CoGroup的业务,在apply方法中通过union联合两条流,然后通过keyby转为KeyedStream,再通过window配置窗口,最终调用窗口函数的apply方法,传入WindowFunction,做CoGroup的业务与用户业务。

具体代码如下已写好备注


    /**
     * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
     * well as a {@link WindowAssigner}.
     *
     * @param <T1> Type of the elements from the first input
     * @param <T2> Type of the elements from the second input
     * @param <KEY> Type of the key. This must be the same for both inputs
     * @param <W> Type of {@link Window} on which the co-group operation works.
     */
    @Public
    public static class WithWindow<T1, T2, KEY, W extends Window> {
        //第一条流
        private final DataStream<T1> input1;
        //第二条流
        private final DataStream<T2> input2;
        //第一个key提取器
        private final KeySelector<T1, KEY> keySelector1;
        //第二个Key提取器
        private final KeySelector<T2, KEY> keySelector2;
        //Key的类型
        private final TypeInformation<KEY> keyType;
        //窗口分配器
        private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
        //窗口出发计算器
        private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

        private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

        private final Time allowedLateness;

        private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
        //构造函数给上面对象赋值
        protected WithWindow(
                DataStream<T1> input1,
                DataStream<T2> input2,
                KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2,
                TypeInformation<KEY> keyType,
                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                Time allowedLateness) {
            this.input1 = input1;
            this.input2 = input2;

            this.keySelector1 = keySelector1;
            this.keySelector2 = keySelector2;
            this.keyType = keyType;

            this.windowAssigner = windowAssigner;
            this.trigger = trigger;
            this.evictor = evictor;

            this.allowedLateness = allowedLateness;
        }

        /**
         * Completes the co-group operation with the user function that is executed for windowed
         * groups.
         *
         * <p>Note: This method's return type does not support setting an operator-specific
         * parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
         * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
         * parallelism.
         */
        public <T> DataStream<T> apply(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            // clean the closure
            function = input1.getExecutionEnvironment().clean(function);
			//创建合并两个流的公共TypeInfo,UnionTypeInfo最终会将Input1,Input2的数据通过map算子转换为该类型
            UnionTypeInfo<T1, T2> unionType =
                    new UnionTypeInfo<>(input1.getType(), input2.getType());
			//转换成union的KeySelector
            UnionKeySelector<T1, T2, KEY> unionKeySelector =
                    new UnionKeySelector<>(keySelector1, keySelector2);
			//将taggedInput1的数据类容map成UnionTypeInfo<T1, T2>类型
            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
                    input1.map(new Input1Tagger<T1, T2>());
            taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
            taggedInput1.returns(unionType);
	       //将taggedInput2的数据类容map成UnionTypeInfo<T1, T2>类型
            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
                    input2.map(new Input2Tagger<T1, T2>());
            taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
            taggedInput2.returns(unionType);
			//将两个流进行union
            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
			//keyBy并且开窗
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                                    unionStream, unionKeySelector, keyType)
                            .window(windowAssigner);
			//配置窗口触发器
            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
			//配置移除器
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
			//配置allowedLateness
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }
			//创建CoGroupWindowFunction ,并把用户函数传入进去
            return windowedStream.apply(
                    new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }

        /**
         * Completes the co-group operation with the user function that is executed for windowed
         * groups.
         *
         * <p><b>Note:</b> This is a temporary workaround while the {@link #apply(CoGroupFunction,
         * TypeInformation)} method has the wrong return type and hence does not allow one to set an
         * operator-specific parallelism
         *
         * @deprecated This method will be removed once the {@link #apply(CoGroupFunction,
         *     TypeInformation)} method is fixed in the next major version of Flink (2.0).
         */
        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            return (SingleOutputStreamOperator<T>) apply(function, resultType);
        }
		
        @VisibleForTesting
        Time getAllowedLateness() {
            return allowedLateness;
        }
		//获取窗口包装流,但是标记为VisibleForTesting,用户无法调用,如果可以调用的话可以通过该方法获取包装流之后通过窗口流获取迟到数据的测输出流
        @VisibleForTesting
        WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {
            return windowedStream;
        }
    }

2.5CoGroupWindowFunction函数分析

CoGroupWindowFunction也是CoGroupedStreams内部类,负责做CoGroup的业务,最终将数据封装好转发给用户函数(也就是2.1中apply中的cgroupFunction)

   private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
            extends WrappingFunction<CoGroupFunction<T1, T2, T>>
            implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {

        private static final long serialVersionUID = 1L;

        public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
            super(userFunction);
        }

        @Override
        public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)
                throws Exception {
			//缓存当前窗口里1号流的数据
            List<T1> oneValues = new ArrayList<>();
			//缓存当前窗口里2号流的数据
            List<T2> twoValues = new ArrayList<>();

            for (TaggedUnion<T1, T2> val : values) {
                if (val.isOne()) {
                    oneValues.add(val.getOne());
                } else {
                    twoValues.add(val.getTwo());
                }
            }
			//传入到用户函数中
            wrappedFunction.coGroup(oneValues, twoValues, out);
        }
    }

3.修改源码支持获取迟到数据测输出流

思路 复制CoGroupedStreams新增一个NewCoGroupedStreams,在WithWindow函数中增加方法sideOutputLateData,让用户传入outputTag,用于提取窗口关闭后的测输出流。

3.1复制CoGroupedStreams

二次开发Flink-coGroup算子支持迟到数据通过测输出流提取,# 从0到1阅读Flink源码,flink,大数据

3.2新增WithWindow.sideOutputLateData方法

新增该方法,传入outputTag,下图WithWindow构造方法是3.3新增的

    @PublicEvolving
        public WithWindow<T1, T2, KEY, W> sideOutputLateData(
                OutputTag<TaggedUnion<T1, T2>> outputTag) {
            return new WithWindow<>(
                    input1,
                    input2,
                    keySelector1,
                    keySelector2,
                    keyType,
                    windowAssigner,
                    trigger,
                    evictor,
                    allowedLateness,
                    outputTag
            );
        }

3.3新增WithWindow构造方法

新增属性laterDataOutputTag,用来保存构造函数中传入的laterOutputTag

   protected WithWindow(
                DataStream<T1> input1,
                DataStream<T2> input2,
                KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2,
                TypeInformation<KEY> keyType,
                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                Time allowedLateness,
                OutputTag<TaggedUnion<T1, T2>> laterOutputTag
        ) {
            this(
                    input1,
                    input2,
                    keySelector1,
                    keySelector2,
                    keyType,
                    windowAssigner,
                    trigger,
                    evictor,
                    allowedLateness);
            this.lateDataOutputTag = laterOutputTag;

        }

3.4修改apply方法

判断lateDataOutputTag 是否为null,如果不为null则调用windowedStream的sideOutputLateData设置迟到数据tag

 /**
         * Completes the co-group operation with the user function that is executed for windowed
         * groups.
         *
         * <p>Note: This method's return type does not support setting an operator-specific
         * parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
         * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
         * parallelism.
         */
        public <T> DataStream<T> apply(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            // clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            UnionTypeInfo<T1, T2> unionType =
                    new UnionTypeInfo<>(input1.getType(), input2.getType());
            UnionKeySelector<T1, T2, KEY> unionKeySelector =
                    new UnionKeySelector<>(keySelector1, keySelector2);

            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
                    input1.map(new Input1Tagger<T1, T2>());
            taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
            taggedInput1.returns(unionType);

            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
                    input2.map(new Input2Tagger<T1, T2>());
            taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
            taggedInput2.returns(unionType);

            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

            // we explicitly create the keyed stream to manually pass the key type information in
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                            unionStream, unionKeySelector, keyType)
                            .window(windowAssigner);
            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }
            //判断lateDataOutputTag是否为NULL,如果不为NULL,则调用windowedStream
            //的sideOutputLateData方法,传入lateDataOutputTag让迟到数据输出到测输出流中
            if (lateDataOutputTag != null) {
                windowedStream.sideOutputLateData(lateDataOutputTag);
            }
            return windowedStream.apply(
                    new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }

3.5开放UnionTypeInfo类的public权限

该类就是union之后的公共类的类型 oneType代表Input1流的数据类型,TwoType代表Input2流的数据类型

3.6编译Flink源码flink-streaming-java模块

进入到flink-streaming-java所在磁盘目录输入以下命令编译

mvn clean install -DskipTests -Dfast

二次开发Flink-coGroup算子支持迟到数据通过测输出流提取,# 从0到1阅读Flink源码,flink,大数据

编译成功

3.7项目中查看maven是否已经刷新为最新代码

编译之后,可以看到导入的maven包已经有了新增的NewCoGroupedStreams类了,注意项目中的maven依赖中的flink版本,要与编译源码的版本一致,否则无法引入到。

二次开发Flink-coGroup算子支持迟到数据通过测输出流提取,# 从0到1阅读Flink源码,flink,大数据

4.测试

新建两个流,通过new NewCoGroupedStreams创建对象,在allowedLateness之后通过sideOutputLateData设置outputTag,然后通过with方法触发业务,with底层也是调用了apply,只不过他帮我们把返回的流转为了SingleOutputStreamOperator类型,可以用于提取测输出流。最后通过with.getSideOutput(outputTag)提取测输出流,最后通过map转换为 Tuple2<Integer, WaterSensor> 类型进行打印

    OutputTag<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>> outputTag = new OutputTag<>("later",
                new NewCoGroupedStreams.UnionTypeInfo<>(Types.POJO(WaterSensor.class), Types.POJO(WaterSensor.class)));
        
        NewCoGroupedStreams<WaterSensor, WaterSensor> newCgroupStream = new NewCoGroupedStreams<>(ds1, ds2);
        
        SingleOutputStreamOperator<String> with = newCgroupStream.where((x) -> x.getId()).equalTo(x -> x.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(outputTag)
                .with(new RichCoGroupFunction<WaterSensor, WaterSensor, String>() {
                    @Override
                    public void coGroup(Iterable<WaterSensor> first, Iterable<WaterSensor> second, Collector<String> out) throws Exception {
                        out.collect(first.toString() + "======" + second.toString());
                    }
                });
        with.print();
        with.getSideOutput(outputTag).map(new MapFunction<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>, Tuple2<Integer, WaterSensor>>() {
            @Override
            public Tuple2<Integer, WaterSensor> map(NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor> value) throws Exception {
                return value.isOne() ? Tuple2.of(1, value.getOne()) : Tuple2.of(2, value.getTwo());
            }
        }).print();

可以看到下图结果,ts代表时间戳,第一个打印是RichCoGroupFunction打印,代表关闭了1~10s的时间窗,后面我们在输入,WaterSensor{id='a', ts=1, vc=1} 就通过测输出流打印为二元组了

二次开发Flink-coGroup算子支持迟到数据通过测输出流提取,# 从0到1阅读Flink源码,flink,大数据文章来源地址https://www.toymoban.com/news/detail-847153.html

到了这里,关于二次开发Flink-coGroup算子支持迟到数据通过测输出流提取的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【API篇】十一、Flink水位线传递与迟到数据处理

    上游task处理完水位线,时钟改变后,要把数据和当前水位线继续往下游算子的task发送。当一个任务接收到多个上游并行任务传递来的水位线时,以 最小的那个作为当前任务的事件时钟 。如图:上游算子并行度为4,: 总结: 接收到上游多个,取最小 往下游多个发送,广播

    2024年02月08日
    浏览(36)
  • 二次开发DataX以支持HIVE分区表

            最近在一个大数据的项目开发中使用到了数据同步工具DataX,但在使用过程中发现了DataX对HIve分区表的支持不太友好。         具体体现在将数据库中的数据同步到HIVE分区表时,写入目录为HIVE表分区为dt=XXXX,如果不提前创建该分区,会报目录不存在的错误,如

    2024年02月16日
    浏览(35)
  • [开源]一个低代码引擎,支持在线实时构建低码平台,支持二次开发

    TinyEngine低代码引擎使能开发者定制低代码平台,支持在线实时构建低码平台,支持二次开发或集成低码平台能力。 使用MIT开源协议 TinyEngine是一个低代码引擎,基于这个引擎可以构建或者开发出不同领域的低代码平台。 跨端跨框架前端组件 支持在线实时构建、支持二次开发

    2024年02月07日
    浏览(28)
  • Debian二次开发网关支持Docker+RS485+网口

    随着物联网技术的不断发展,瑞芯微边缘计算网关作为一种集成多种接口和功能的智能网关,逐渐成为了物联网领域中的热门产品。本文将详细介绍瑞芯微边缘计算网关的特点和优势,并探讨其在实际应用中的广泛应用。 瑞芯微Linux系统边缘计算网关是一种具有高性能、低功

    2024年02月11日
    浏览(31)
  • calcite在flink中的二次开发,介绍解析器与优化器

    关于calcite的概念相关的内容,在我另一篇帖子 深入理解flinksql执行流程,扩展解析器实现语法的扩展 首先阐述一下 codegen: Codegen是基于ObjectWeb ASM的低开销的java代码生成器,他可以根据预先填好的规则与条件,通过编译代码,自动生成java类 在递归调用各个节点 DataStreamRel 的

    2024年02月22日
    浏览(41)
  • Java+springboot+avue医院绩效考核系统源码支持二次开发

    公立医院改革要求建立公立医疗卫生机构绩效考核体系,借助绩效考核来引导各级公立医院把社会效益摆在首位,提高医疗服务质量,规范医疗服务行为,加强医院内部管理,促进医院高质量发展 医院绩效考核系统,建立以医院发展目标为导向,以医务人员劳动价值、工作量

    2024年02月05日
    浏览(33)
  • 高性能、可扩展、支持二次开发的企业电子招标采购系统源码

    在数字化时代,企业需要借助先进的数字化技术来提高工程管理效率和质量。招投标管理系统作为企业内部业务项目管理的重要应用平台,涵盖了门户管理、立项管理、采购项目管理、采购公告管理、考核管理、报表管理、评审管理、企业管理、采购管理和系统管理等多个方

    2024年01月23日
    浏览(41)
  • 【Java】智慧工地管理系统源代码,支持二次开发,SaaS模式

    智慧工地系统围绕工程现场人、机、料、法、环及施工过程中质量、安全、进度、成本等各项数据满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效。 1、施工现场管理难:安全事故频发,人工巡检难度大,质量进度协同难等问题仍没有得到解决; 2.人员管理

    2024年02月04日
    浏览(30)
  • Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)

                           星光下的赶路人star的个人主页                        将自己生命力展开的人,他的存在,对别人就是愈疗 1、从《星球大战》说起 为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例

    2024年02月07日
    浏览(35)
  • 谱图论:Laplacian二次型和Markov转移算子

    以下部分是我学习CMU 15-751: TCS Toolkit的课堂笔记。由于只是个人笔记,因此许多地方在推导上可能不那么严谨,还望理论大佬多多包涵。 在本文中,我们将研究对象限定在无向图(undirected graph) (G=(V, E)) ,且满足: 有限(finite); 允许重边和自环; 不允许度为0的顶点(即

    2024年02月08日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包