【Apache Flink】实现有状态函数

这篇具有很好参考价值的文章主要介绍了【Apache Flink】实现有状态函数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在RuntimeContext 中声明键值分区状态

Flink为键值分区状态(Keyed State)提供了几种不同的原语(数据类型)。这是因为不同的算法和操作可能需要管理不同类型的状态。其中一些原语包括:

  1. ValueState: 这种状态类型用于存储单个的,可能更新的值。常见的用途包括存储计数器或聚合。

  2. ListState: 这种状态用于存储一组元素(通常是元素的长列表)。借助此状态,可以简单地追加元素和迭代所有元素。

  3. ReducingStateAggregatingState<IN, OUT>: 这两种状态都用于合并元素,通常在窗口操作中使用。

    • ReducingState:将添加的元素与现有元素通过reduce函数进行合并,最后只会保留一个元素,即合并的结果。

    • AggregatingState:与ReducingState类似,但是其可以存储转换后的聚合结果,而不是输入元素。

  4. MapState<UK, UV>: 这种状态类型存储一个key-value映射。

要使用某一类型的 keyed state,需要提供一个 StateDescriptor,用于声明状态的名称和类型。然后可以通过 RuntimeContext 获取状态。

这些状态类型都是接口,并将存储后端(Flink提供了内存和RocksDB两种用于存储状态的后端)的具体实现细节隔离出来,因此用户可以不用关心状态是如何存储和访问的。

Flink 的键控状态使我们能够通过简单的API调用,就能够很自然地处理键控数据流,我们只需要关心特定键的当前事件和状态,Flink 框架会自动地处理状态的分布式存储和故障恢复等

我们需要了解在 Flink 中,RuntimeContext 提供访问在运行期间的任务 (比如 Map、Reduce 或 Filter function) 可以访问的上下文信息,例如任务的并行度,任务名称,任务 ID,输入和输出信号等。此外,RuntimeContext 还为用户代码提供了生成和维护分布式累加器和键值状态的方法。

在 Apache Flink 中,键值状态(Keyed State)是一种类型的状态,它是以 key 为中心的。每一个 key 都可以对应一个状态。我们可以在 Flink 算子的open()方法中通过 RuntimeContext 获取和初始化它。

举个例子,假设我们正在构建一个实时的网络游戏分析系统,我们可能关注每位玩家的实时得分,这个得分基于他们在游戏中执行的动作(例如完成一项任务,击败一个敌人等)。在这个场景中,每个玩家的ID就是一个 "键",同时他们的游戏得分就是与键关联的 "状态"。当玩家在游戏中执行动作时,我们需要调整他们的分数状态

然后,我们的 Flink 代码可以定义一个 RichMapFunction 来维护每个玩家的分数状态:

public class PlayerScoreFunction extends RichMapFunction<GameEvent, Tuple2<String, Long>> {
  
  // 定义键控状态
  private transient ValueState<Long> scoreState;

  @Override
  public void open(Configuration params) throws Exception {
    ValueStateDescriptor<Long> descriptor = 
        new ValueStateDescriptor<>(
          "playerScore", // 状态的名称
          TypeInformation.of(new TypeHint<Long>() {}),
          0L); // 默认值
    scoreState = getRuntimeContext().getState(descriptor);
  }

  @Override
  public Tuple2<String, Long> map(GameEvent gameEvent) throws Exception {
    // update the state
    long currentScore = scoreState.value();
    currentScore += gameEvent.getScore();
    scoreState.update(currentScore);
    // return the updated score
    return new Tuple2<>(gameEvent.getPlayerId(), currentScore);
  }
}

在这个例子中,PlayerScoreFunction 接收 GameEvent 流,这是玩家在游戏中的各种动作生成的事件。我们将玩家的 ID 作为键来处理这个流。通过 getRuntimeContext().getState(descriptor) 我们获得了状态。然后我们在每次新的 GameEvent 到来时,根据事件中的分数增量用 scoreState.update(currentScore) 更新状态,然后将更新后的得分以及玩家的 ID 一起输出给下一个算子,例如,连接到实时的游戏分数仪表盘,将每个玩家的最新得分显示给观众看。

通过ListCheckPonitend 接口实现算子列表状态

算子状态(Operator State)在流处理系统(比如 Apache Flink)中,是一种特殊类型的状态,针对的是整个算子,而不是特定的键值。它存储的是某一特定算子的所有记录的全局信息。

算子状态的维护主要包括以下步骤:

  1. 定义算子状态:首先,我们需要在处理函数中定义一个或多个算子状态。我们可以指定算子状态的名字,并定义它存储的数据类型。

  2. 读取和写入算子状态:一旦定义了算子状态,我们就可以在流处理函数中对它进行读取和写入。读取算子状态通常在需要根据状态信息做出处理决策时进行。写入算子状态通常在我们需要更新状态信息时进行。

  3. 保持状态一致:为了保持状态的一致性,我们需要定期将算子状态进行快照(Snapshot)并保存到远程存储系统中。在系统中断后,我们可以从最新的快照恢复算子状态。

  4. 状态恢复:在系统中断后,我们可以使用保存的快照恢复算子状态,恢复流处理的执行。

维护算子状态的方法可能会根据具体的流处理系统有所不同,但基本原理是相同的。这四步是维护算子状态的基本过程。

在 Flink 中,ListState 是 CheckpointedState 的一种。ListState 可以为每一条数据保存不止一个值,也就是说,所有的数据都会添加到该状态中。在故障恢复时,这些元素按添加的顺序重放。我们从 CheckpointedFunctionListCheckpointed 接口的抽象类型继承,然后实现 snapshotStaterestoreState 方法,以完成状态恢复。

具体来说,如果我们想使用 ListCheckpointed 接口实现算子列表状态,可以参考以下的代码:

我们每次接收到未序列化的 String 类型的数值,就把它转成 Integer 类型存储在一个列表(List)中。在每个 Checkpoint 操作当中,通过 snapshotState 方法进行状态的快照并返回。当故障发生后,Flink 会调用 restoreState 方法将状态恢复回来。

如果算子是并行的,Flink 会为每一个子任务调用 restoreState 方法,并在算子的每个子任务中创建一个新的列表状态实例。在故障后进行状态恢复时,Flink 将提取快照并将其分发到每个子任务。

public class ListStateFunction extends RichMapFunction<String, Integer> implements ListCheckpointed<Integer> {

  private List<Integer> bufferElements;

  public ListStateFunction(){
    this.bufferElements = new ArrayList<>();
  }

  @Override
  public Integer map(String value) throws Exception {
    int parsedValue = Integer.parseInt(value);
    bufferElements.add(parsedValue);
    return bufferElements.size();
  }

  // 每次 checkpoint 时,将缓存的元素进行快照
  @Override
  public List<Integer> snapshotState(long checkpointId, long timestamp)  {
    return this.bufferElements;
  }

  // 从存储中恢复状态
  @Override
  public void restoreState(List<Integer> state) {
    this.bufferElements.addAll(state);
  }
}

使用 ListCheckpointed 还是 CheckpointedFunction 取决于特定的需求和上下文,两者在功能上是相似的,但 CheckpointedFunction 提供了更多的灵活性,可以让你自己决定如何存储和恢复状态以及存储于哪种类型的状态后端。

使用CheckpointedFunction接口

Apache Flink提供了一个特殊的接口CheckpointedFunction,可以在自定义函数中使用它来操作和管理算子状态。这个接口会在检查点(checkpoint)操作时触发,允许访问和编辑操作员状态。

h使用CheckpointedFunction的例子:

public class CountWithCheckpoint implements CheckpointedFunction, MapFunction<Long, Long> {

  private transient ValueState<Long> counter;

  @Override
  public void initializeState(FunctionInitializationContext context) throws Exception {
    ValueStateDescriptor<Long> descriptor = 
      new ValueStateDescriptor<>("counter", TypeInformation.of(new TypeHint<Long>() {}));
    counter = getRuntimeContext().getState(descriptor);
  }

  @Override
  public Long map(Long value) throws Exception {
    Long currentCount = counter.value();
    Long newCount = currentCount == null ? 1L : currentCount + 1;
    counter.update(newCount);
    return newCount;
  }

  @Override
  public void snapshotState(FunctionSnapshotContext context) throws Exception {
    counter.clear();
  }
}

此示例创建一个计数但在每个检查点清空的函数。initializeState()方法会在各种生命周期事件(例如,开始和恢复)时调用并初始化状态变量。然后在map()方法中,状态被更新。snapshotState()在checkpoint操作时触发,这里我们仅清空状态,无任何持久化操作。

在操作和维护算子状态时,我们需要考虑状态的一致性和恢复,以处理可能的故障和中断。实际中可能会对snapshotState()方法更复杂的逻辑,比如将状态存储至远端。

接收检查点完成通知

在Apache Flink中,当所有任务成功从接头位置创建检查点后,作业管理器将坐标控制条以通知所有任务检查点的成功完成。然后,所有任务都会得到一个新的检查点的完成通知。

如果要接收这样的通知并对其做出反应,可以让你的RichFunction实现CheckpointListener接口。以下是一个基本示例:

函数使用ListState进行状态管理,每个接收到的元素都会被添加到状态中。并且,我们实现了notifyCheckpointComplete(long checkpointId)函数,以便在每次成功完成检查点后接收到通知。这个函数里你可以进行一些操作如清除状态、更新外部系统等。

触发的notifyCheckpointComplete方法是在下一次checkpoint发生在Task周的快照操作之前,具体的实现要根据你的检查点配置和故障恢复能力进行规划。文章来源地址https://www.toymoban.com/news/detail-720121.html

public class MyFunction extends RichMapFunction<Long, Long> implements CheckpointListener {

    private transient ListState<Long> checkpointedState;

    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor<Long> descriptor = 
            new ListStateDescriptor<>("state", Long.class);
        checkpointedState = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public Long map(Long value) throws Exception {
        checkpointedState.add(value);
        return value;
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // 监听到检查点成功完成的通知,此处可以进行相关逻辑处理
    }
}

参考文档

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/

到了这里,关于【Apache Flink】实现有状态函数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据-玩转数据-Flink状态编程(上)

    有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。 SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。 Flink的状态管理是它的优

    2024年02月09日
    浏览(44)
  • 数据架构的实时分析:Apache Flink 和 Apache Storm 的比较

    实时数据处理在大数据领域具有重要意义,它可以帮助企业更快地获取和分析数据,从而更快地做出决策。随着数据量的增加,传统的批处理方法已经不能满足企业的需求,因此需要使用实时数据处理技术。 Apache Flink 和 Apache Storm 是两个流行的实时数据处理框架,它们都可以

    2024年01月23日
    浏览(53)
  • 大数据-玩转数据-Flink状态后端(下)

    每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(

    2024年02月09日
    浏览(44)
  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(78)
  • Apache Hudi初探(三)(与flink的结合)--flink写hudi的操作(真正的写数据)

    在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到 写hudi真实数据 以及 写hudi元数据 ,这篇文章来说一下具体的实现 这里的操作就是在 HoodieFlinkWriteClient.upsert 方法: initTable 初始化HoodieFlinkTable preWrite 在这里几乎没

    2024年02月10日
    浏览(34)
  • [大数据 Flink,Java实现不同数据库实时数据同步过程]

    目录 🌮前言: 🌮实现Mysql同步Es的过程包括以下步骤: 🌮配置Mysql数据库连接 🌮在Flink的配置文件中,添加Mysql数据库的连接信息。可以在flink-conf.yaml文件中添加如下配置: 🌮在Flink程序中,使用JDBCInputFormat来连接Mysql数据库,并定义查询语句,获取需要同步的数据。具体代

    2024年02月10日
    浏览(40)
  • 流数据湖平台Apache Paimon(三)Flink进阶使用

    2.9.1 写入性能 Paimon的写入性能与检查点密切相关,因此需要更大的写入吞吐量: 增加检查点间隔,或者仅使用批处理模式。 增加写入缓冲区大小。 启用写缓冲区溢出。 如果您使用固定存储桶模式,请重新调整存储桶数量。 2.9.1.1 并行度 建议sink的并行度小于等于bucket的数量

    2024年02月09日
    浏览(32)
  • 流数据湖平台Apache Paimon(二)集成 Flink 引擎

    Paimon目前支持Flink 1.17, 1.16, 1.15 和 1.14。本课程使用Flink 1.17.0。 环境准备 2.1.1 安装 Flink 1)上传并解压Flink安装包 tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/ 2)配置环境变量 2.1.2 上传 jar 包 1)下载并上传Paimon的jar包 jar包下载地址:https://repository.apache.org/snapshots/org/apache/pa

    2024年02月09日
    浏览(44)
  • 【大数据-实时流计算】图文详解 Apache Flink 架构原理

    目录 Apache Flink架构介绍 一、Flink组件栈 二、Flink运行时架构 在Flink的整个

    2024年02月02日
    浏览(41)
  • 【大数据】深入浅出 Apache Flink:架构、案例和优势

    Apache Flink 是一个强大的开源流处理框架,近年来在大数据社区大受欢迎。它允许用户实时处理和分析大量流式数据,使其成为 欺诈检测 、 股市分析 和 机器学习 等现代应用的理想选择。 在本文中,我们将详细介绍什么是 Apache Flink 以及如何使用它来为您的业务带来益处。

    2024年01月17日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包