Flink流批一体计算(19):PyFlink DataStream API之State

这篇具有很好参考价值的文章主要介绍了Flink流批一体计算(19):PyFlink DataStream API之State。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

keyed state

Keyed DataStream

使用 Keyed State

实现了一个简单的计数窗口

状态有效期 (TTL)

过期数据的清理

全量快照时进行清理

增量数据清理

在 RocksDB 压缩时清理

Operator State算子状态

Broadcast State广播状态


keyed state

Keyed DataStream

使用 keyed state,首先需要为DataStream指定 key(主键)。这个主键用于状态分区(也会给数据流中的记录本身分区)。

使用 DataStream 中 Java/Scala API 的 keyBy(KeySelector) 或者是 Python API 的 key_by(KeySelector) 来指定 key。它将生成 KeyedStream,接下来允许使用 keyed state 操作。

Keyselector函数接收单条记录作为输入,返回这条记录的 key。该 key 可以为任何类型,但是它的计算产生方式必须是具备确定性的。

Flink的数据模型不基于key-value对,因此实际上将数据集在物理上封装成 key和 value是没有必要的。Key是“虚拟”的。它们定义为基于实际数据的函数,用以操纵分组算子。

使用 Keyed State

keyed state接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的key。

换句话说,这些状态仅可在KeyedStream上使用,在Java/Scala API上可以通过stream.keyBy(...)得到 KeyedStream,在Python API上可以通过 stream.key_by(...) 得到 KeyedStream。

所有支持的状态类型如下所示:

ValueState<T>: 保存一个可以更新和检索的值

Liststate<T>: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。

ReducingState<T>: 保存一个单值,表示添加到状态的所有值的聚合。但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。

所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

实现了一个简单的计数窗口

我们把元组的第一个元素当作 key。 该函数将出现的次数以及总和存储在 “ValueState” 中。

一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。 请注意,我们会为每个不同的 key(元组中第一个元素)保存一个单独的值。

必须创建一个 StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称,状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。

根据不同的状态类型,可以创建ValueStateDescriptor,ListstateDescriptor, AggregatingStateDescriptor, ReducingStateDescriptor 或MapStateDescriptor。

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, FlatMapFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor

class CountWindowAverage(FlatMapFunction):
    def __init__(self):
        self.sum = None

    def open(self, runtime_context: RuntimeContext):
        descriptor = ValueStateDescriptor(
            "average",  # the state name
            Types.PICKLED_BYTE_ARRAY()  # type information
        )
        self.sum = runtime_context.get_state(descriptor)

    def flat_map(self, value):
        # access the state value
        current_sum = self.sum.value()
        if current_sum is None:
            current_sum = (0, 0)
        # update the count
        current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
        # update the state
        self.sum.update(current_sum)
        # if the count reaches 2, emit the average and clear the state
        if current_sum[0] >= 2:
            self.sum.clear()
            yield value[0], int(current_sum[1] / current_sum[0])


env = StreamExecutionEnvironment.get_execution_environment()
env.from_collection([(1, 3), (1, 5), (1, 7), (1, 4), (1, 2)]) \
    .key_by(lambda row: row[0]) \
    .flat_map(CountWindowAverage()) \
    .print()

env.execute()
# the printed output will be (1,4) and (1,5)

状态有效期 (TTL)

任何类型的keyed state都可以有有效期(TTL)。如果配置了TTL且状态值已过期,则会尽最大可能清除对应的值。所有状态类型都支持单元素的TTL。 这意味着列表元素和映射元素将独立到期。

在使用状态 TTL 前,需要先构建一个StateTtlConfig 配置对象。 然后把配置传递到state descriptor中启用TTL功能。

from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig

ttl_config = StateTtlConfig \
  .new_builder(Time.seconds(1)) \
  .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
  .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
  .build()

state_descriptor = ValueStateDescriptor("text state", Types.STRING())
state_descriptor.enable_time_to_live(ttl_config)

TTL配置有以下几个选项:

newBuilder 的第一个参数表示数据的有效期,是必选项。

TTL 的更新策略(默认是 OnCreateAndWrite):

StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新

StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新

数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):

    StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据

    (注意: 在PyFlink作业中,状态的读写缓存都将失效,这将导致一部分的性能损失)

    NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。

StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据

    (注意: 在PyFlink作业中,状态的读缓存将会失效,这将导致一部分的性能损失)

    ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。

过期数据的清理

默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理.

from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
  .new_builder(Time.seconds(1)) \
  .disable_cleanup_in_background() \
  .build()

可以配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。

全量快照时进行清理

可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。

该策略可以通过 StateTtlConfig 配置进行配置,这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。

from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
  .new_builder(Time.seconds(1)) \
  .cleanup_full_snapshot() \
  .build()

这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时。

增量数据清理

现在仅 Heap state backend 支持增量清除机制。

增量式清理状态数据,在状态访问或/和处理时进行。如果没有 state 访问,也没有处理数据,则不会清理过期数据。增量清理会增加数据处理的耗时。

如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。

每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
  .new_builder(Time.seconds(1)) \
  .cleanup_incrementally(10, True) \
  .build()

该策略有两个参数。 第一个是每次清理时检查状态的条目数,在每个状态访问时触发。

第二个参数表示是否在处理每条记录时触发清理。 Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。

在 RocksDB 压缩时清理

如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。

from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
  .new_builder(Time.seconds(1)) \
  .cleanup_in_rocksdb_compact_filter(1000) \
  .build()

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。

时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。

RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。

注意:

    压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。 对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。

    对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器, 从而确定下一个未过期数据的位置。

    对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。

Operator State算子状态

Python DataStream API 仍无法支持算子状态

算子状态(或者非 keyed 状态)是绑定到一个并行算子实例的状态。Kafka Connector 是 Flink 中使用算子状态一个很具有启发性的例子。

Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。

当并行度改变的时候,算子状态支持将状态重新分发给各并行算子实例。处理重分发过程有多种不同的方案。

在典型的有状态 Flink 应用中你无需使用算子状态。它大都作为一种特殊类型的状态使用。用于实现 source/sink,以及无法对 state 进行分区而没有主键的这类场景中。

Broadcast State广播状态

Python DataStream API 仍无法支持广播状态

广播状态是一种特殊的算子状态。引入它的目的在于支持一个流中的元素需要广播到所有下游任务的使用情形。在这些任务中广播状态用于保持所有子任务状态相同。

该状态接下来可在第二个处理记录的数据流中访问。可以设想包含了一系列用于处理其他流中元素规则的低吞吐量数据流,这个例子自然而然地运用了广播状态。

考虑到上述这类使用情形,广播状态和其他算子状态的不同之处在于:

它具有 map 格式,

它仅在一些特殊的算子中可用。这些算子的输入为一个广播数据流和非广播数据流,

这类算子可以拥有不同命名的多个广播状态 。文章来源地址https://www.toymoban.com/news/detail-686697.html

到了这里,关于Flink流批一体计算(19):PyFlink DataStream API之State的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(60)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(43)
  • Flink流批一体计算(20):DataStream API和Table API互转

    目录 举个例子 连接器 下载连接器(connector)和格式(format)jar 包 依赖管理  如何使用连接器 举个例子 StreamExecutionEnvironment 集成了DataStream API,通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 TableEnvironment 将采用StreamExecutionEnvironment所有的配置选项。 建

    2024年02月10日
    浏览(41)
  • Flink流批一体计算(14):PyFlink Tabel API之SQL查询

    举个例子 查询 source 表,同时执行计算 Table API 查询 Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。 Table API 文档描述了流和批

    2024年02月12日
    浏览(42)
  • Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用来: ·创建 Table ·将 Table 注册成临时表 ·执行 SQL 查询 ·注册用户自定义的 (标量,表值,或者聚合) 函数 ·配置作业 ·管理 Python 依赖 ·提交作业执行 创建 source 表 创建 sink

    2024年02月12日
    浏览(40)
  • Flink流批一体计算(15):PyFlink Tabel API之SQL写入Sink

    目录 举个例子 写入Sink的各种情况 1. 将结果数据收集到客户端 2. 将结果数据转换为Pandas DataFrame,并收集到客户端 3. 将结果写入到一张 Sink 表中 4. 将结果写入多张 Sink 表中 举个例子 将计算结果写入给 sink 表 写入Sink的各种情况 1. 将结果数据收集到客户端 你可以使用 TableR

    2024年02月11日
    浏览(40)
  • 流批一体计算引擎-7-[Flink]的DataStream连接器

    参考官方手册DataStream Connectors 一、预定义的Source和Sink 一些比较基本的Source和Sink已经内置在Flink里。 1、预定义data sources支持从文件、目录、socket,以及collections和iterators中读取数据。 2、预定义data sinks支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 sock

    2023年04月08日
    浏览(40)
  • Flink流批一体计算(1):流批一体和Flink概述

    Apache Flink应运而生 数字化经济革命的浪潮正在颠覆性地改变着人类的工作方式和生活方式,数字化经济在全球经济增长中扮演着越来越重要的角色,以互联网、云计算、大数据、物联网、人工智能为代表的数字技术近几年发展迅猛,数字技术与传统产业的深度融合释放出巨大

    2024年02月10日
    浏览(42)
  • flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)

    前言:今天是学习flink的第四天啦!学习了物理分区的知识点,这一次学习了前4个简单的物理分区,称之为简单分区篇! Tips:我相信自己会越来会好的,明天攻克困难分区篇,加油! 3. 物理分区 3.1 Global Partitioner 该分区器会将所有的数据都发送到下游的某个算子实例(subta

    2024年02月19日
    浏览(39)
  • flink重温笔记(五):Flink 流批一体 API 开发——物理分区(下)

    前言 :今天是学习 flink 的第五天啦! 主要学习了物理分区较难理解的部分,在这个部分的三个分区的学习中, rescale partition 和 forward partition 其原理可以归类 pointwise 模式,其他的 partition 其原理可以归类 all_to_all 模式,而比较有趣的是 custom partitioning,这个可以进行根据值

    2024年02月19日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包