Flink中KeyedStateStore实现--怎么做到一个Key对应一个State

这篇具有很好参考价值的文章主要介绍了Flink中KeyedStateStore实现--怎么做到一个Key对应一个State。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

在Flink中有两种基本的状态:Keyed State和Operator StateOperator State很好理解,一个特定的Operator算子共享同一个state,这是实现层面很好做到的。
但是 Keyed State 是怎么实现的?一般来说,正常的人第一眼就会想到:一个task绑定一个Keyd State,从网上随便查找资料就能发现正确的答案是:对于每一个Key会绑定一个State,但是这在Flink中是怎么实现的呢?
注意:这里我们只讲Flink中是怎么实现一个Key对应一个State的,其他细节并不细说,且state的backend为RocksDB

闲说杂谈

我们以ValueState类型的Keyed State举例:


ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =
        new ValueStateDescriptor<>(
            "indexState",
            TypeInformation.of(HoodieRecordGlobalLocation.class));
ValueState<HoodieRecordGlobalLocation> indexState = context.getKeyedStateStore().getState(indexStateDesc)
....
indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation())

  • context.getKeyedStateStore().getState是获取对应keyState,最终的调用链如下:

     DefaultKeyedStateStore.getState -> getPartitionedState
                                                  ||
                                                  \/
     RocksDBKeyedStateBackend.getPartitionedState -> getOrCreateKeyedState -> createInternalState -> tryRegisterKvStateInformation
                                                  ||
                                                  \/
    
     RocksDBValueState.create(创建RocksDBValueState)                                                                             
                                          
    

    这里的 tryRegisterKvStateInformation会涉及到RocksDB ColumnFamily的创建:

    RocksDBOperationUtils.createStateInfo -> createColumnFamilyDescriptor 
    // createColumnFamilyDescriptor的部分代码:
    ColumnFamilyOptions options =
                 createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());
    if (ttlCompactFiltersManager != null) {
        ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options);
    }
    byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
    ...
    return new ColumnFamilyDescriptor(nameBytes, options);
    
    

    其实最终会发现RocksDBColumnFamily是跟ValueStateDescriptor也就是描述符的名字有关的,这就是为什么描述符必须是唯一的,关于RocksDBColumnFamily,可以参考RocksDB 简介
    注意此时返回是key对应的一个State的ColumnFamily,该Family包括该task所有的key的value值

  • indexState.update 这里是更新indexState得值
    因为上一步得到只是该Task所对应的ColumanFamily所对应的所有的values,也就是* Flink中的Key-Groups*,(关于Key-Groups可以参考Apache-Flink深度解析-State)

      public void update(V value) {
         if (value == null) {
             clear();
             return;
         }
    
         try {
             backend.db.put(
                     columnFamily,
                     writeOptions,
                     serializeCurrentKeyWithGroupAndNamespace(),
                     serializeValue(value));
         } catch (Exception e) {
             throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
         }
     }
    

    最终的调用链如下:

    RocksDBValueState.update -> serializeCurrentKeyWithGroupAndNamespace
            ||
            \/
    SerializedCompositeKeyBuilder.buildCompositeKeyNamespace
            ||
            \/
    serializeNamespace(namespace, namespaceSerializer) -> keyOutView.getCopyOfBuffer()   
    
    

    这里的keyOutView.getCopyOfBuffer是会获得的record的key,所以在backend.db.put方法中才会更新对应的Key值。
    但是什么时候Record的key信息会被写入到keyOutView中去呢?

  • Record的key何时被写到keyOutView

    AbstractStreamTaskNetworkInput.emitNext -> processElement
           ||
           \/
    OneInputStreamTask.emitRecord
           ||
           \/
    OneInputStreamOperator.setKeyContextElement -> setKeyContextElement1 -> setKeyContextElement
    
           ||
           \/
    AbstractStreamOperator.setCurrentKey
           ||
           \/
    StreamOperatorStateHandler.setCurrentKey
           ||
           \/
    RocksDBKeyedStateBackend.setCurrentKey
           ||
           \/
    SerializedCompositeKeyBuilder.setCurrentKey -> serializeKeyGroupAndKey
           ||
           \/
    keySerializer.serialize(key, keyOutView);    
    
    

    最后一步keySerializer.serialize(key, keyOutView)一个Record的key就被写到keyOutView中,也就是说对应的key是从每个record中获取的,所以在backend.db.put方法中就能获取到对应的Key

其他

对于keyedStateStore是在哪里初始化的,可以看AbstractStreamOperatorinitializeState方法:

final StreamOperatorStateContext context =
             streamTaskStateManager.streamOperatorStateContext(
                     getOperatorID(),
                     getClass().getSimpleName(),
                     getProcessingTimeService(),
                     this,
                     keySerializer,
                     streamTaskCloseableRegistry,
                     metrics,
                     config.getManagedMemoryFractionOperatorUseCaseOfSlot(
                             ManagedMemoryUseCase.STATE_BACKEND,
                             runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
                             runtimeContext.getUserCodeClassLoader()),
                     isUsingCustomRawKeyedState());

stateHandler =
        new StreamOperatorStateHandler(
                context, getExecutionConfig(), streamTaskCloseableRegistry);

这个方法里也包括了keyedStatedBackendoperatorStateBackend等初始化, 具体的细节后续再解析。文章来源地址https://www.toymoban.com/news/detail-498460.html

到了这里,关于Flink中KeyedStateStore实现--怎么做到一个Key对应一个State的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redis的key过期策略是怎么实现的

    这是一道经典的Redis面试题,一个Redis中可能存在很多很多的key,这些key中可能有很大一部分都有过期时间,此时Redis服务器咋知道哪些key已经过期,哪些还没过期呢? 如果直接遍历所有的key,这显然是行不通的,效率非常低!! Redis整体的策略是定期删除和惰性删除相结合。

    2024年01月19日
    浏览(29)
  • 怎么做到Kafka顺序读写

    一个大的binlog数据库,还原出来了很多SQL语句 binlog生成SQL语句方式 SQL语句需要顺序执行,因为不顺序执行,比如先新增了一条数据,才有可能修改这条数据,假如先执行修改操作,后执行新增操作,那这个数据就错了 如果表的binlog文件很小,直接执行就可以了; 如果表的

    2024年02月16日
    浏览(53)
  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(50)
  • Flink 启动就报错,但exception没提示。其中一个task failure 该怎么办?

    最近我在生产又遇到一个问题,就是消费着一段时间之后,忽然就不再消费了,但也不报错。观察了几次,我发现时间基本是停留在上下班高峰期数据量最大的时候。我主观猜测可能是同时间进来的数据过多,处理不来导致的。但这个问题我还没来的及思考怎么处理,因此我

    2024年02月16日
    浏览(56)
  • 高防CDN是怎么做到阻止网络攻击的

    什么是高防CDN CDN是什么?CDN是在现有 Internet 中增加的一层新的网络架构,由遍布全国的高性能加速节点构成。这些高性能的服务节点都会按照一定的缓存策略存储您的业务内容,当您的用户向您的某一业务内容发起请求时,请求会被调度至最接近用户的服务节点,直接由服

    2024年02月13日
    浏览(50)
  • 【Linux 内核分析课程作业 1】mmap 实现一个 key-valueMap

    功能要求利用 mmap(虚拟内存映射文件) 机制实现一个带持久化能力的 key-valueMap 系统,至少支持单机单进程访问。(可能用到的 linux API: mmap、msync、mremap、munmap、ftruncate、fallocate 等) 电子版提交方式: 2023 年 11 月 20 日 18:00 前通过西电智课平台提交 提交内容 (1) 源代码,包含必

    2024年02月05日
    浏览(34)
  • MFC中如何使用map获取对应key的Vlaue值

    使用map获取对应key的Vlaue值 引用#include using namespace std; //添加值 UDT_MAP_INT_CSTRING.insert(std::mapint, CString::value_type(1, _T(“a”))); UDT_MAP_INT_CSTRING.insert(std::mapint, CString::value_type(2, _T(“b”))); UDT_MAP_INT_CSTRING.insert(std::mapint, CString::value_type(3, _T(“c”))); UDT_MAP_INT_CSTRING.insert(std::mapint, CS

    2024年02月04日
    浏览(31)
  • Excel部分单元格不能编辑,什么原因?怎么做到的?

    Excel文件打开之后,发现在一个工作表中,有些单元格点击之后没有反应,有些就是正常的。大家可能不知道是什么原因,这个是设置了工作表保护的原因。 大家可能会想,工作表保护是对整个工作表的保护,应该不是保护一部分单元格的吧,其实是可以的。今天和大家分享

    2024年02月02日
    浏览(42)
  • Vue3实现带点击外部关闭对应弹出框(可共用一个变量)

    首先,假设您在单文件组件(SFC)中使用了Vue3,并且有两个div元素分别通过`v-if`和`v-else`来切换显示一个带有`.elpopver`类的弹出组件。在这种情况下,每个弹出组件应当拥有独立的状态管理(例如:各自的isOpen变量)。为了实现点击外部关闭对应弹出框的功能,我们需要为每个组

    2024年01月18日
    浏览(102)
  • 【C++】iota函数 + sort函数实现基于一个数组的多数组对应下标绑定排序

    目录 一、iota函数 1. 函数解析 ​①  迭代器类型(补充) ② 头文件 ③  参数 2. 函数用途与实例 二、sort函数 1、 函数解读 2、实现倒序排列 2.1 greater 与 less 模板参数 2.2  lambda表达式 三、下标绑定排序(zip) --- 833.字符串中的查找与替换 ①  迭代器类型(补充) ForwardIterator :

    2024年02月12日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包