0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

这篇具有很好参考价值的文章主要介绍了0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分
0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows),大数据,python,大数据,flink
那么有没有办法让上图中(B,2)和(D,5)也会被计算呢?
这就可以使用本节介绍的时间滚动窗口。它不依赖于窗口中元素的个数,而是窗口的时间,即窗口时间到了,计算就会进行。
我们稍微修改下《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》的例子,让元素集中在“A”上。

map

class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]


word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),
                   ("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 

reduce

    # reducing
    reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))
        
    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

这儿我们的Window使用的是滚动时间窗口,其中参数Time.milliseconds(2)是指窗口时长,即2毫秒一个窗口。
我们运行多次代码可以得到不同的结果

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698771761164, end=1698771761166)
(A,12)
(‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771761166, end=1698771761168)
(A,8)

0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows),大数据,python,大数据,flink

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) TimeWindow(start=1698771731386, end=1698771731388)
(A,16)
(‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771731388, end=1698771731390)
(A,4)

0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows),大数据,python,大数据,flink

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771714992, end=1698771714994)
(A,20)

0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows),大数据,python,大数据,flink

可以发现结果并不稳定。但是可以发现,每个元素都参与了计算,而不像个数滚动窗口那样部分数据没有被触发计算。文章来源地址https://www.toymoban.com/news/detail-735433.html

完整代码

from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TimeWindow, TumblingProcessingTimeWindows
   
class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]


word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),
                   ("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 
    
    # reducing
    reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))
        
    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.TumblingProcessingTimeWindows.html#pyflink.datastream.window.TumblingProcessingTimeWindows

到了这里,关于0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 0基础学习PyFlink——用户自定义函数之UDTAF

    在前面几篇文章中,我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。本节我们将介绍最后一种函数:UDTAF——用户自定义表值聚合函数。 UDTAF函数即具备了UDTF的特点,也具备UDAF的特点。即它可以像《0基础学习PyFlink——用户自定义函数之UDTF》介绍的UDTF那样 可以返回任

    2024年02月08日
    浏览(52)
  • 0基础学习PyFlink——用户自定义函数之UDAF

    在前面几篇文章中,我们学习了非聚合类的用户自定义函数。这节我们将介绍最简单的聚合函数UDAF。 我们对比下UDAF和UDF的定义 可以发现: udaf比udf多了一个参数accumulator_type udaf比udf少了一个参数udf_type accumulator中文是“累加器”。我们可以将其看成聚合过后(比如GroupBy)的

    2024年02月08日
    浏览(44)
  • 0基础学习PyFlink——用户自定义函数之UDTF

    在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF 我们对比下UDF和UDTF 可以发现: UDF比UDTF多了func_type和udf_type参数; UDTF的返回类型比UDF的丰富,多了两个List类型:List[DataType]和List[str]; 特别是最后一点,可以认为是UDF和UD

    2024年02月07日
    浏览(34)
  • 0基础学习PyFlink——用户自定义函数之UDF

    PyFlink中关于用户定义方法有: UDF:用户自定义函数。 UDTF:用户自定义表值函数。 UDAF:用户自定义聚合函数。 UDTAF:用户自定义表值聚合函数。 这些字母可以拆解如下: UD表示User Defined(用户自定义); F表示Function(方法); T表示Table(表); A表示Aggregate(聚合); Aggr

    2024年02月08日
    浏览(47)
  • Selenium基础 — Selenium操作浏览器窗口滚动条

    1、为什么操作滚动条 在HTML页面中,由于前端技术框架的原因,页面中的一些元素为动态显示,元素根据滚动条的下拉而被加载。 例如:页面注册同意条款,需要滚动条到最底层,才能点击同意。 2、Selenium如何操作滚动条 Selenium的WebDriver类库中并没有直接提供对滚动条进行操

    2024年02月02日
    浏览(43)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月01日
    浏览(47)
  • 《Flink学习笔记》——第六章 Flink的时间和窗口

    6.1 时间语义 6.1.1 Flink中的时间语义 对于一台机器而言,时间就是系统时间。但是Flink是一个分布式处理系统,多台机器“各自为政”,没有统一的时钟,各自有各自的系统时间。而对于并行的子任务来说,在不同的节点,系统时间就会有所差异。 我们知道一个集群有JobMana

    2024年02月11日
    浏览(40)
  • Linux 硬件时间(RTC time)、系统时间(UTC时间、Universal time)、本地时间(Local time)、时区(Time zone)与夏令时(DST)解析

    处理和管理时间是计算机科学的重要方面,但也是最复杂和容易混淆的方面之一。本文将详细介绍硬件时间、系统时间(UTC时间)、本地时间、时区和夏令时,希望能帮助读者更好地理解这些概念。 1.1 硬件时间简介 硬件时间,也被称为实时时钟(RTC),是指计算机主板上的

    2024年02月04日
    浏览(38)
  • 简述建立时间(setup time) 和 保持时间(hold time)

    本文是根据参考了网上多篇帖子和书籍,对于有关建立时间和保持时间知识点讲解的基础上进行归纳总结,如有错误敬请批评指正! 一、概念         建立时间和保持时间都是基于触发器而言,所以在了解建立时间和保持时间之前,需要对触发器进行分析,本文从D触发器

    2024年01月16日
    浏览(46)
  • QT添加窗口滚动条

    QT UI界面太大,在笔记本小屏幕上显示不全,增加窗口滚动条可以拖到窗口来显示 如果已经有设计的UI,需要先全选然后剪切到粘贴板上 选择QT Creator左侧类栏中的Scroll Area,拖放到UI上,调整大小覆盖UI 粘贴之前剪切的UI到Scroll Area之上。 修改scrollArea其中三个属性,具体如下

    2024年02月11日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包