Java源码示例:org.apache.flink.runtime.state.BackendBuildingException

示例1
/**
 * Recovery from a single remote incremental state without rescaling.
 */
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
	if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
		IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
			(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
		restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
		restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
	} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
		IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
			(IncrementalLocalKeyedStateHandle) keyedStateHandle;
		restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
		restoreFromLocalState(incrementalLocalKeyedStateHandle);
	} else {
		throw new BackendBuildingException("Unexpected state handle type, " +
			"expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
			", but found " + keyedStateHandle.getClass());
	}
}
 
示例2
private void initDBWithRescaling(KeyedStateHandle initialHandle) throws Exception {

		assert (initialHandle instanceof IncrementalRemoteKeyedStateHandle);

		// 1. Restore base DB from selected initial handle
		restoreFromRemoteState((IncrementalRemoteKeyedStateHandle) initialHandle);

		// 2. Clip the base DB instance
		try {
			RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
				db,
				columnFamilyHandles,
				keyGroupRange,
				initialHandle.getKeyGroupRange(),
				keyGroupPrefixBytes);
		} catch (RocksDBException e) {
			String errMsg = "Failed to clip DB after initialization.";
			LOG.error(errMsg, e);
			throw new BackendBuildingException(errMsg, e);
		}
	}
 
示例3
private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final long key) throws BackendBuildingException {
	final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
	ExecutionConfig executionConfig = new ExecutionConfig();
	// objects for heap state list serialisation
	final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
		new HeapKeyedStateBackendBuilder<>(
			mock(TaskKvStateRegistry.class),
			LongSerializer.INSTANCE,
			ClassLoader.getSystemClassLoader(),
			keyGroupRange.getNumberOfKeyGroups(),
			keyGroupRange,
			executionConfig,
			TtlTimeProvider.DEFAULT,
			Collections.emptyList(),
			AbstractStateBackend.getCompressionDecorator(executionConfig),
			TestLocalRecoveryConfig.disabled(),
			new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
			async,
			new CloseableRegistry()).build();
	longHeapKeyedStateBackend.setCurrentKey(key);
	return longHeapKeyedStateBackend;
}
 
示例4
private static OperatorStateBackend createOperatorStateBackend(
	RuntimeContext runtimeContext,
	Collection<OperatorStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry){

	try {
		return new DefaultOperatorStateBackendBuilder(
			runtimeContext.getUserCodeClassLoader(),
			runtimeContext.getExecutionConfig(),
			false,
			stateHandles,
			cancelStreamRegistry
		).build();
	} catch (BackendBuildingException e) {
		throw new RuntimeException(e);
	}
}
 
示例5
/**
 * Recovery from a single remote incremental state without rescaling.
 */
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
	if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
		IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
			(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
		restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
		restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
	} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
		IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
			(IncrementalLocalKeyedStateHandle) keyedStateHandle;
		restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
		restoreFromLocalState(incrementalLocalKeyedStateHandle);
	} else {
		throw new BackendBuildingException("Unexpected state handle type, " +
			"expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
			", but found " + keyedStateHandle.getClass());
	}
}
 
示例6
private void initDBWithRescaling(KeyedStateHandle initialHandle) throws Exception {

		assert (initialHandle instanceof IncrementalRemoteKeyedStateHandle);

		// 1. Restore base DB from selected initial handle
		restoreFromRemoteState((IncrementalRemoteKeyedStateHandle) initialHandle);

		// 2. Clip the base DB instance
		try {
			RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
				db,
				columnFamilyHandles,
				keyGroupRange,
				initialHandle.getKeyGroupRange(),
				keyGroupPrefixBytes);
		} catch (RocksDBException e) {
			String errMsg = "Failed to clip DB after initialization.";
			LOG.error(errMsg, e);
			throw new BackendBuildingException(errMsg, e);
		}
	}
 
示例7
private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final long key) throws BackendBuildingException {
	final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
	ExecutionConfig executionConfig = new ExecutionConfig();
	// objects for heap state list serialisation
	final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
		new HeapKeyedStateBackendBuilder<>(
			mock(TaskKvStateRegistry.class),
			LongSerializer.INSTANCE,
			ClassLoader.getSystemClassLoader(),
			keyGroupRange.getNumberOfKeyGroups(),
			keyGroupRange,
			executionConfig,
			TtlTimeProvider.DEFAULT,
			Collections.emptyList(),
			AbstractStateBackend.getCompressionDecorator(executionConfig),
			TestLocalRecoveryConfig.disabled(),
			new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
			async,
			new CloseableRegistry()).build();
	longHeapKeyedStateBackend.setCurrentKey(key);
	return longHeapKeyedStateBackend;
}
 
示例8
@Override
public OperatorStateBackend createOperatorStateBackend(
	Environment env,
	String operatorIdentifier,
	@Nonnull Collection<OperatorStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
	if (operatorIdentifier.contains(DECLINE_SINK_NAME)) {
		return new DeclineSinkFailingOperatorStateBackend(
			env.getExecutionConfig(),
			cancelStreamRegistry,
			new DeclineSinkFailingSnapshotStrategy());
	} else {
		return new DefaultOperatorStateBackendBuilder(
			env.getUserClassLoader(),
			env.getExecutionConfig(),
			false,
			stateHandles,
			cancelStreamRegistry).build();
	}
}
 
示例9
private static OperatorStateBackend createOperatorStateBackend(
	RuntimeContext runtimeContext,
	Collection<OperatorStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry){

	try {
		return new DefaultOperatorStateBackendBuilder(
			runtimeContext.getUserCodeClassLoader(),
			runtimeContext.getExecutionConfig(),
			false,
			stateHandles,
			cancelStreamRegistry
		).build();
	} catch (BackendBuildingException e) {
		throw new RuntimeException(e);
	}
}
 
示例10
/**
 * Recovery from a single remote incremental state without rescaling.
 */
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
	if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
		IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
			(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
		restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
		restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
	} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
		IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
			(IncrementalLocalKeyedStateHandle) keyedStateHandle;
		restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
		restoreFromLocalState(incrementalLocalKeyedStateHandle);
	} else {
		throw new BackendBuildingException("Unexpected state handle type, " +
			"expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
			", but found " + keyedStateHandle.getClass());
	}
}
 
示例11
private void initDBWithRescaling(KeyedStateHandle initialHandle) throws Exception {

		assert (initialHandle instanceof IncrementalRemoteKeyedStateHandle);

		// 1. Restore base DB from selected initial handle
		restoreFromRemoteState((IncrementalRemoteKeyedStateHandle) initialHandle);

		// 2. Clip the base DB instance
		try {
			RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
				db,
				columnFamilyHandles,
				keyGroupRange,
				initialHandle.getKeyGroupRange(),
				keyGroupPrefixBytes,
				writeBatchSize);
		} catch (RocksDBException e) {
			String errMsg = "Failed to clip DB after initialization.";
			LOG.error(errMsg, e);
			throw new BackendBuildingException(errMsg, e);
		}
	}
 
示例12
private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final long key) throws BackendBuildingException {
	final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
	ExecutionConfig executionConfig = new ExecutionConfig();
	// objects for heap state list serialisation
	final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
		new HeapKeyedStateBackendBuilder<>(
			mock(TaskKvStateRegistry.class),
			LongSerializer.INSTANCE,
			ClassLoader.getSystemClassLoader(),
			keyGroupRange.getNumberOfKeyGroups(),
			keyGroupRange,
			executionConfig,
			TtlTimeProvider.DEFAULT,
			Collections.emptyList(),
			AbstractStateBackend.getCompressionDecorator(executionConfig),
			TestLocalRecoveryConfig.disabled(),
			new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
			async,
			new CloseableRegistry()).build();
	longHeapKeyedStateBackend.setCurrentKey(key);
	return longHeapKeyedStateBackend;
}
 
示例13
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
	Environment env,
	JobID jobID,
	String operatorIdentifier,
	TypeSerializer<K> keySerializer,
	int numberOfKeyGroups,
	KeyGroupRange keyGroupRange,
	TaskKvStateRegistry kvStateRegistry,
	TtlTimeProvider ttlTimeProvider,
	MetricGroup metricGroup,
	@Nonnull Collection<KeyedStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {

	TaskStateManager taskStateManager = env.getTaskStateManager();
	LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
	HeapPriorityQueueSetFactory priorityQueueSetFactory =
		new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);

	return new HeapKeyedStateBackendBuilder<>(
		kvStateRegistry,
		keySerializer,
		env.getUserClassLoader(),
		numberOfKeyGroups,
		keyGroupRange,
		env.getExecutionConfig(),
		ttlTimeProvider,
		stateHandles,
		AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
		localRecoveryConfig,
		priorityQueueSetFactory,
		isUsingAsynchronousSnapshots(),
		cancelStreamRegistry).build();
}
 
示例14
@Override
public OperatorStateBackend createOperatorStateBackend(
	Environment env,
	String operatorIdentifier,
	@Nonnull Collection<OperatorStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {

	return new DefaultOperatorStateBackendBuilder(
		env.getUserClassLoader(),
		env.getExecutionConfig(),
		isUsingAsynchronousSnapshots(),
		stateHandles,
		cancelStreamRegistry).build();
}
 
示例15
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
	Environment env,
	JobID jobID,
	String operatorIdentifier,
	TypeSerializer<K> keySerializer,
	int numberOfKeyGroups,
	KeyGroupRange keyGroupRange,
	TaskKvStateRegistry kvStateRegistry,
	TtlTimeProvider ttlTimeProvider,
	MetricGroup metricGroup,
	@Nonnull Collection<KeyedStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {

	TaskStateManager taskStateManager = env.getTaskStateManager();
	HeapPriorityQueueSetFactory priorityQueueSetFactory =
		new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
	return new HeapKeyedStateBackendBuilder<>(
		kvStateRegistry,
		keySerializer,
		env.getUserClassLoader(),
		numberOfKeyGroups,
		keyGroupRange,
		env.getExecutionConfig(),
		ttlTimeProvider,
		stateHandles,
		AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
		taskStateManager.createLocalRecoveryConfig(),
		priorityQueueSetFactory,
		isUsingAsynchronousSnapshots(),
		cancelStreamRegistry).build();
}
 
示例16
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
	Environment env,
	JobID jobID,
	String operatorIdentifier,
	TypeSerializer<K> keySerializer,
	int numberOfKeyGroups,
	KeyGroupRange keyGroupRange,
	TaskKvStateRegistry kvStateRegistry,
	TtlTimeProvider ttlTimeProvider,
	MetricGroup metricGroup,
	@Nonnull Collection<KeyedStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {

	TaskStateManager taskStateManager = env.getTaskStateManager();
	LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
	HeapPriorityQueueSetFactory priorityQueueSetFactory =
		new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);

	return new HeapKeyedStateBackendBuilder<>(
		kvStateRegistry,
		keySerializer,
		env.getUserClassLoader(),
		numberOfKeyGroups,
		keyGroupRange,
		env.getExecutionConfig(),
		ttlTimeProvider,
		stateHandles,
		AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
		localRecoveryConfig,
		priorityQueueSetFactory,
		isUsingAsynchronousSnapshots(),
		cancelStreamRegistry).build();
}
 
示例17
@Override
public OperatorStateBackend createOperatorStateBackend(
	Environment env,
	String operatorIdentifier,
	@Nonnull Collection<OperatorStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {

	return new DefaultOperatorStateBackendBuilder(
		env.getUserClassLoader(),
		env.getExecutionConfig(),
		isUsingAsynchronousSnapshots(),
		stateHandles,
		cancelStreamRegistry).build();
}
 
示例18
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
	Environment env,
	JobID jobID,
	String operatorIdentifier,
	TypeSerializer<K> keySerializer,
	int numberOfKeyGroups,
	KeyGroupRange keyGroupRange,
	TaskKvStateRegistry kvStateRegistry,
	TtlTimeProvider ttlTimeProvider,
	MetricGroup metricGroup,
	@Nonnull Collection<KeyedStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {

	TaskStateManager taskStateManager = env.getTaskStateManager();
	HeapPriorityQueueSetFactory priorityQueueSetFactory =
		new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
	return new HeapKeyedStateBackendBuilder<>(
		kvStateRegistry,
		keySerializer,
		env.getUserClassLoader(),
		numberOfKeyGroups,
		keyGroupRange,
		env.getExecutionConfig(),
		ttlTimeProvider,
		stateHandles,
		AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
		taskStateManager.createLocalRecoveryConfig(),
		priorityQueueSetFactory,
		isUsingAsynchronousSnapshots(),
		cancelStreamRegistry).build();
}
 
示例19
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
	Environment env,
	JobID jobID,
	String operatorIdentifier,
	TypeSerializer<K> keySerializer,
	int numberOfKeyGroups,
	KeyGroupRange keyGroupRange,
	TaskKvStateRegistry kvStateRegistry,
	TtlTimeProvider ttlTimeProvider,
	MetricGroup metricGroup,
	@Nonnull Collection<KeyedStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {

	TaskStateManager taskStateManager = env.getTaskStateManager();
	LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
	HeapPriorityQueueSetFactory priorityQueueSetFactory =
		new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);

	return new HeapKeyedStateBackendBuilder<>(
		kvStateRegistry,
		keySerializer,
		env.getUserClassLoader(),
		numberOfKeyGroups,
		keyGroupRange,
		env.getExecutionConfig(),
		ttlTimeProvider,
		stateHandles,
		AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
		localRecoveryConfig,
		priorityQueueSetFactory,
		isUsingAsynchronousSnapshots(),
		cancelStreamRegistry).build();
}
 
示例20
@Override
public OperatorStateBackend createOperatorStateBackend(
	Environment env,
	String operatorIdentifier,
	@Nonnull Collection<OperatorStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {

	return new DefaultOperatorStateBackendBuilder(
		env.getUserClassLoader(),
		env.getExecutionConfig(),
		isUsingAsynchronousSnapshots(),
		stateHandles,
		cancelStreamRegistry).build();
}
 
示例21
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
	Environment env,
	JobID jobID,
	String operatorIdentifier,
	TypeSerializer<K> keySerializer,
	int numberOfKeyGroups,
	KeyGroupRange keyGroupRange,
	TaskKvStateRegistry kvStateRegistry,
	TtlTimeProvider ttlTimeProvider,
	MetricGroup metricGroup,
	@Nonnull Collection<KeyedStateHandle> stateHandles,
	CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {

	TaskStateManager taskStateManager = env.getTaskStateManager();
	HeapPriorityQueueSetFactory priorityQueueSetFactory =
		new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
	return new HeapKeyedStateBackendBuilder<>(
		kvStateRegistry,
		keySerializer,
		env.getUserClassLoader(),
		numberOfKeyGroups,
		keyGroupRange,
		env.getExecutionConfig(),
		ttlTimeProvider,
		stateHandles,
		AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
		taskStateManager.createLocalRecoveryConfig(),
		priorityQueueSetFactory,
		isUsingAsynchronousSnapshots(),
		cancelStreamRegistry).build();
}
 
示例22
@Override
public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
	// Map of registered Key/Value states
	Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
	// Map of registered priority queue set states
	Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates = new HashMap<>();
	CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
	HeapSnapshotStrategy<K> snapshotStrategy = initSnapshotStrategy(
		asynchronousSnapshots, registeredKVStates, registeredPQStates, cancelStreamRegistryForBackend);
	HeapKeyedStateBackend<K> backend = new HeapKeyedStateBackend<>(
		kvStateRegistry,
		keySerializerProvider,
		userCodeClassLoader,
		numberOfKeyGroups,
		keyGroupRange,
		executionConfig,
		ttlTimeProvider,
		cancelStreamRegistryForBackend,
		keyGroupCompressionDecorator,
		registeredKVStates,
		registeredPQStates,
		localRecoveryConfig,
		priorityQueueSetFactory,
		snapshotStrategy
	);
	HeapRestoreOperation<K> restoreOperation = new HeapRestoreOperation<>(
		restoreStateHandles,
		keySerializerProvider,
		userCodeClassLoader,
		registeredKVStates,
		registeredPQStates,
		cancelStreamRegistry,
		priorityQueueSetFactory,
		keyGroupRange,
		numberOfKeyGroups,
		snapshotStrategy,
		backend);
	try {
		restoreOperation.restore();
	} catch (Exception e) {
		backend.dispose();
		throw new BackendBuildingException("Failed when trying to restore heap backend", e);
	}
	return backend;
}
 
示例23
@Override
public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
	// Map of registered Key/Value states
	Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
	// Map of registered priority queue set states
	Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates = new HashMap<>();
	CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
	HeapSnapshotStrategy<K> snapshotStrategy = initSnapshotStrategy(
		asynchronousSnapshots, registeredKVStates, registeredPQStates, cancelStreamRegistryForBackend);
	InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
		keyGroupRange,
		numberOfKeyGroups
	);
	HeapRestoreOperation<K> restoreOperation = new HeapRestoreOperation<>(
		restoreStateHandles,
		keySerializerProvider,
		userCodeClassLoader,
		registeredKVStates,
		registeredPQStates,
		cancelStreamRegistry,
		priorityQueueSetFactory,
		keyGroupRange,
		numberOfKeyGroups,
		snapshotStrategy,
		keyContext);
	try {
		restoreOperation.restore();
	} catch (Exception e) {
		throw new BackendBuildingException("Failed when trying to restore heap backend", e);
	}
	return new HeapKeyedStateBackend<>(
		kvStateRegistry,
		keySerializerProvider.currentSchemaSerializer(),
		userCodeClassLoader,
		executionConfig,
		ttlTimeProvider,
		cancelStreamRegistryForBackend,
		keyGroupCompressionDecorator,
		registeredKVStates,
		registeredPQStates,
		localRecoveryConfig,
		priorityQueueSetFactory,
		snapshotStrategy,
		keyContext);
}
 
示例24
@Override
public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
	// Map of registered Key/Value states
	Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
	// Map of registered priority queue set states
	Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates = new HashMap<>();
	CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
	HeapSnapshotStrategy<K> snapshotStrategy = initSnapshotStrategy(
		asynchronousSnapshots, registeredKVStates, registeredPQStates, cancelStreamRegistryForBackend);
	InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
		keyGroupRange,
		numberOfKeyGroups
	);
	HeapRestoreOperation<K> restoreOperation = new HeapRestoreOperation<>(
		restoreStateHandles,
		keySerializerProvider,
		userCodeClassLoader,
		registeredKVStates,
		registeredPQStates,
		cancelStreamRegistry,
		priorityQueueSetFactory,
		keyGroupRange,
		numberOfKeyGroups,
		snapshotStrategy,
		keyContext);
	try {
		restoreOperation.restore();
	} catch (Exception e) {
		throw new BackendBuildingException("Failed when trying to restore heap backend", e);
	}
	return new HeapKeyedStateBackend<>(
		kvStateRegistry,
		keySerializerProvider.currentSchemaSerializer(),
		userCodeClassLoader,
		executionConfig,
		ttlTimeProvider,
		cancelStreamRegistryForBackend,
		keyGroupCompressionDecorator,
		registeredKVStates,
		registeredPQStates,
		localRecoveryConfig,
		priorityQueueSetFactory,
		snapshotStrategy,
		keyContext);
}