窗口延时、侧输出流数据处理

这篇具有很好参考价值的文章主要介绍了窗口延时、侧输出流数据处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一 、 AllowedLateness API 延时关闭窗口

AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s,AllowedLateness 的时间为2s,那一个10的滚动窗口,0-10这个单位窗口正常的关窗时间应该是超过12s的数据到达之后就关窗。而AllowedLateness 是在12s的基础上继续延长了2s,也就是在14s的时候才真正去关闭 0-10s的窗口,但是在12s的时候会触发窗口计算,从12s之后到14s的数据每到达一个就会触发一次窗口计算。

二 、 OutputTag API 侧输出流

使用 OutputTag API 保证窗口关闭的数据依然可以获取,窗口到达AllowedLateness 时间后将彻底关闭,此时再属于该窗口范围内的数据将会流向 OutputTag 。

       context.collect(new Event("A", "/user", 1000L));
                Thread.sleep(3000);
                context.collect(new Event("B", "/prod", 6500L));
                Thread.sleep(3000);
                context.collect(new Event("C", "/cart", 4000L));
                Thread.sleep(3000);
                context.collect(new Event("D", "/user", 7500L));
                System.out.println("窗口关闭 ~ ");
                Thread.sleep(3000);
                context.collect(new Event("E", "/cente", 8500L));
                Thread.sleep(3000);
                context.collect(new Event("F", "/cente", 4000L));
                Thread.sleep(3000);
                context.collect(new Event("G", "/cente", 9200L));
                Thread.sleep(3000);
                context.collect(new Event("H", "/cente", 1000L));
                Thread.sleep(3000);
                context.collect(new Event("I", "/cente", 1500L));
                Thread.sleep(3000);

如果现在定义一个 5s的
滚动窗口,WaterMark延时时间为2s,AllowedLateness 延时时间为2s,此时相当于是 WaterMark到达9s的时候才会关闭0-5的窗口,也就是说最后两条数据会流向OutputTag . 当4000L数据到达后,会再次触发一次窗口计算。

完全与预期一致。

窗口延时、侧输出流数据处理,大数据计算,开发语言,flink

完整代码:文章来源地址https://www.toymoban.com/news/detail-703782.html

public class WindowOutputTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = Env.getEnv();

        DataStreamSource<Event> dataStreamSource = env.addSource(new SourceFunction<Event>() {
            @Override
            public void run(SourceContext<Event> context) throws Exception {
                context.collect(new Event("A", "/user", 1000L));
                Thread.sleep(3000);
                context.collect(new Event("B", "/prod", 6500L));
                Thread.sleep(3000);
                context.collect(new Event("C", "/cart", 4000L));
                Thread.sleep(3000);
                context.collect(new Event("D", "/user", 7500L));
                System.out.println("窗口关闭 ~ ");
                Thread.sleep(3000);
                context.collect(new Event("E", "/cente", 8500L));
                Thread.sleep(3000);
                context.collect(new Event("F", "/cente", 4000L));
                Thread.sleep(3000);
                context.collect(new Event("G", "/cente", 9200L));
                Thread.sleep(3000);
                context.collect(new Event("H", "/cente", 1000L));
                Thread.sleep(3000);
                context.collect(new Event("I", "/cente", 1500L));
                Thread.sleep(3000);
            }

            @Override
            public void cancel() {

            }
        });

        //operator
        SingleOutputStreamOperator<Event> operator = dataStreamSource.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))// 水位线延时2s
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        })
        );

        OutputTag<Event> eventOutputTag = new OutputTag<Event>("late") {
        };

        WindowedStream<Event, Boolean, TimeWindow> windowedStream = operator.keyBy(d -> true)
                .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
                .allowedLateness(Time.of(2, TimeUnit.SECONDS))
                .sideOutputLateData(eventOutputTag);


        SingleOutputStreamOperator<String> windowAgg = windowedStream.aggregate(new AggregateFunction<Event, Long, Long>() {
            @Override
            public Long createAccumulator() {
                return 0L;
            }

            @Override
            public Long add(Event event, Long acc) {
                return acc + 1;
            }

            @Override
            public Long getResult(Long acc) {
                return acc;
            }

            @Override
            public Long merge(Long aLong, Long acc1) {
                return null;
            }
        }, new ProcessWindowFunction<Long, String, Boolean, TimeWindow>() {
            @Override
            public void process(Boolean key, Context context, Iterable<Long> iterable, Collector<String> collector) throws Exception {
                long start = context.window().getStart();
                long end = context.window().getEnd();
                collector.collect(new Timestamp(start) + " ~ " + new Timestamp(end) + " ===> " + iterable.iterator().next());
            }
        });

        windowAgg.print("窗口数据 ");

        //获取测输出流中的延时数据
        DataStream<Event> sideOutput = windowAgg.getSideOutput(eventOutputTag);
        sideOutput.print("测输出流:-> ");


        env.execute();

    }

}

到了这里,关于窗口延时、侧输出流数据处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 云计算与大数据处理技术_云计算与大数据处理

    AIoT技术分析:云计算一般的计算机技术很难支撑企业的运作,于是云计算顺应时代而生,广泛地应用到了企业中。 云计算的概念 云计算是一种新兴的商业计算模型。... 并支持大规模数据处理、高容错性和自我管理等特性,提供PB级的存储能力,使用结构化的文件来存储数据,并整个

    2024年02月01日
    浏览(66)
  • Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)

                           星光下的赶路人star的个人主页                        将自己生命力展开的人,他的存在,对别人就是愈疗 1、从《星球大战》说起 为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例

    2024年02月07日
    浏览(50)
  • 【Matlab】如何读取文件夹下所有txt数据进行处理并以txt结果更名输出

    如何读取文件夹下所有txt数据进行处理并以txt结果更名输出 目录 前言 一、Matlab中fullfile函数用法 二、使用步骤 1.读取文件夹下所有txt文件并以struct存储变量 2.循环下读取每个txt文件中的数据并进行处理 总结 遇到Matlab需要大批量处理一个文件夹下所有的txt格式,经过信号处

    2024年02月07日
    浏览(74)
  • 008-关于FPGA/ZYNQ直接处理图像传感器数据输出的若干笔记(裸板采集思路)

    最近也是未来需要考虑做的一件事情是,如何通过FPGA/ZYNQ去做显微镜图像观测下的图像采集传输与后续的处理。目前显微镜观测领域通常是以PC端连接工业相机接口,这个接口可以是USB3.0,可以是网口,也可以是其它传输方式。常常通过工业相机输出的为视频流数据,厂商会

    2024年01月23日
    浏览(59)
  • 云计算与大数据处理:实时计算与数据流

    云计算和大数据处理是当今信息技术领域的两个热门话题。随着互联网的普及和人们生活中的各种设备的不断增多,我们生活中的数据量不断增加,这些数据需要存储和处理。云计算是一种基于互联网的计算资源共享和分配模式,可以让用户在需要时轻松获取计算资源,从而

    2024年04月13日
    浏览(46)
  • 数据架构与云计算:如何利用云计算资源进行数据处理

    随着数据的爆炸增长,数据处理和分析成为了企业和组织中的关键技能。云计算是一种新兴的技术,它可以让我们在分布式环境中进行数据处理和分析。在这篇文章中,我们将探讨如何利用云计算资源进行数据处理,以及相关的核心概念、算法原理、具体操作步骤和数学模型

    2024年04月14日
    浏览(44)
  • 数据建模的云计算支持:利用云计算资源提高数据处理效率

    数据建模是数据科学和机器学习领域中的一个重要环节,它涉及到将实际问题转化为数学模型的过程。随着数据规模的不断扩大,传统的数据处理方法已经无法满足需求,因此需要寻找更高效的数据处理方法。云计算是一种基于互联网的计算资源分配和共享方式,它可以提供

    2024年04月28日
    浏览(42)
  • 数据关联分析:云计算与大规模数据处理

    数据关联分析是一种常见的数据挖掘技术,它主要用于发现两个数据集之间的关联关系。随着数据规模的不断增加,传统的关联分析方法已经无法满足大规模数据处理的需求。云计算技术在这里发挥了重要作用,它可以提供高性能的计算资源,以满足大规模数据处理的需求。

    2024年04月23日
    浏览(43)
  • 云计算与大数据处理:数据驱动的决策

    随着互联网的普及和数据的迅速增长,大数据技术已经成为企业和组织的核心竞争力。大数据处理技术涉及到海量数据的收集、存储、处理和分析,以支持企业的决策和优化。云计算是大数据处理的重要技术之一,它为大数据处理提供了高性能、高可扩展性和高可靠性的计算

    2024年04月12日
    浏览(36)
  • 数据挖掘的云计算与大规模数据处理

    数据挖掘是指从大量数据中发现新的、有价值的信息和知识的过程。随着互联网和人工智能技术的发展,数据量不断增加,这使得数据挖掘变得越来越重要。云计算和大规模数据处理技术为数据挖掘提供了强大的支持,使得数据挖掘能够在更短的时间内获得更好的结果。 本文

    2024年04月14日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包