背景
在Flink中有两种基本的状态:Keyed State和Operator State,Operator State很好理解,一个特定的Operator算子共享同一个state,这是实现层面很好做到的。
但是 Keyed State 是怎么实现的?一般来说,正常的人第一眼就会想到:一个task绑定一个Keyd State,从网上随便查找资料就能发现正确的答案是:对于每一个Key会绑定一个State,但是这在Flink中是怎么实现的呢?
注意:这里我们只讲Flink中是怎么实现一个Key对应一个State的,其他细节并不细说,且state的backend为RocksDB
闲说杂谈
我们以ValueState类型的Keyed State举例:
ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =
new ValueStateDescriptor<>(
"indexState",
TypeInformation.of(HoodieRecordGlobalLocation.class));
ValueState<HoodieRecordGlobalLocation> indexState = context.getKeyedStateStore().getState(indexStateDesc)
....
indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation())
-
context.getKeyedStateStore().getState是获取对应key的State,最终的调用链如下:
DefaultKeyedStateStore.getState -> getPartitionedState || \/ RocksDBKeyedStateBackend.getPartitionedState -> getOrCreateKeyedState -> createInternalState -> tryRegisterKvStateInformation || \/ RocksDBValueState.create(创建RocksDBValueState)
这里的 tryRegisterKvStateInformation会涉及到RocksDB ColumnFamily的创建:
RocksDBOperationUtils.createStateInfo -> createColumnFamilyDescriptor // createColumnFamilyDescriptor的部分代码: ColumnFamilyOptions options = createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName()); if (ttlCompactFiltersManager != null) { ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options); } byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); ... return new ColumnFamilyDescriptor(nameBytes, options);
其实最终会发现RocksDB的ColumnFamily是跟ValueStateDescriptor也就是描述符的名字有关的,这就是为什么描述符必须是唯一的,关于RocksDB的ColumnFamily,可以参考RocksDB 简介
注意此时返回是key对应的一个State的ColumnFamily,该Family包括该task所有的key的value值 -
indexState.update 这里是更新indexState得值
因为上一步得到只是该Task所对应的ColumanFamily所对应的所有的values,也就是* Flink中的Key-Groups*,(关于Key-Groups可以参考Apache-Flink深度解析-State)public void update(V value) { if (value == null) { clear(); return; } try { backend.db.put( columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace(), serializeValue(value)); } catch (Exception e) { throw new FlinkRuntimeException("Error while adding data to RocksDB", e); } }
最终的调用链如下:
RocksDBValueState.update -> serializeCurrentKeyWithGroupAndNamespace || \/ SerializedCompositeKeyBuilder.buildCompositeKeyNamespace || \/ serializeNamespace(namespace, namespaceSerializer) -> keyOutView.getCopyOfBuffer()
这里的keyOutView.getCopyOfBuffer是会获得的record的key,所以在backend.db.put方法中才会更新对应的Key值。
但是什么时候Record的key信息会被写入到keyOutView中去呢? -
Record的key何时被写到keyOutView中
AbstractStreamTaskNetworkInput.emitNext -> processElement || \/ OneInputStreamTask.emitRecord || \/ OneInputStreamOperator.setKeyContextElement -> setKeyContextElement1 -> setKeyContextElement || \/ AbstractStreamOperator.setCurrentKey || \/ StreamOperatorStateHandler.setCurrentKey || \/ RocksDBKeyedStateBackend.setCurrentKey || \/ SerializedCompositeKeyBuilder.setCurrentKey -> serializeKeyGroupAndKey || \/ keySerializer.serialize(key, keyOutView);
最后一步keySerializer.serialize(key, keyOutView)一个Record的key就被写到keyOutView中,也就是说对应的key是从每个record中获取的,所以在backend.db.put方法中就能获取到对应的Key
其他
对于keyedStateStore是在哪里初始化的,可以看AbstractStreamOperator中initializeState方法:文章来源:https://www.toymoban.com/news/detail-498460.html
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
getProcessingTimeService(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics,
config.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.STATE_BACKEND,
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState());
stateHandler =
new StreamOperatorStateHandler(
context, getExecutionConfig(), streamTaskCloseableRegistry);
这个方法里也包括了keyedStatedBackend和operatorStateBackend等初始化, 具体的细节后续再解析。文章来源地址https://www.toymoban.com/news/detail-498460.html
到了这里,关于Flink中KeyedStateStore实现--怎么做到一个Key对应一个State的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!