FlinkStateDescriptor Name的作用

mac2024-08-18  60

结论: ColumnFamily Name==ColumnFamilyDescirptor Name == StateDescriptor Name

解释:

kuduErrorDescriptor = new ListStateDescriptor<>("test", TypeInformation.of(new TypeHint<Object>() { })); kuduErrorDescriptor.enableTimeToLive(ttlConfig); listState = getRuntimeContext().getListState(kuduErrorDescriptor);

用过 Flink KeyedState的小伙伴,相信很熟悉上面的代码。 代码中的"test"就是StateDescriptor Name。但是ColumnFamily Name==ColumnFamilyDescirptor Name == StateDescriptor Name为什么会成立呢? 我们以ListState为例: getRuntimeContext().getListState(kuduErrorDescriptor);一路跟代码我们可以看到 RocksDBKeyedStateBackend 中 tryRegisterKvStateInformation方法

...... //第一次获取state //将stateDesc.getName()赋值给newMetaInfo name newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateSerializer, StateSnapshotTransformFactory.noTransform()); newRocksStateInfo = RocksDBOperationUtils.createStateInfo( newMetaInfo, db, columnFamilyOptionsFactory, ttlCompactFiltersManager); ......

追踪createStateInfo

//将metaInfoBase name赋值给 columnFamilyDescriptor name ColumnFamilyDescriptor columnFamilyDescriptor = createColumnFamilyDescriptor( metaInfoBase, columnFamilyOptionsFactory, ttlCompactFiltersManager); return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(createColumnFamily(columnFamilyDescriptor, db), metaInfoBase);

追踪createColumnFamily到 RocksDB 的createColumnFamily

public ColumnFamilyHandle createColumnFamily( final ColumnFamilyDescriptor columnFamilyDescriptor) throws RocksDBException { return new ColumnFamilyHandle(this, createColumnFamily(nativeHandle_, //传入columnFamilyDescriptor name , columnFamilyDescriptor.columnFamilyName(), columnFamilyDescriptor.columnFamilyOptions().nativeHandle_)); }

继续发现其调用本地方法

//利用columnFamilyDescriptor name 创建 columnFamily private native long createColumnFamily(final long handle, final byte[] columnFamilyName, final long columnFamilyOptions) throws RocksDBException;
最新回复(0)