Java源码示例:org.apache.flink.runtime.state.BackendBuildingException
示例1
/**
* Recovery from a single remote incremental state without rescaling.
*/
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
(IncrementalLocalKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
restoreFromLocalState(incrementalLocalKeyedStateHandle);
} else {
throw new BackendBuildingException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
", but found " + keyedStateHandle.getClass());
}
}
示例2
private void initDBWithRescaling(KeyedStateHandle initialHandle) throws Exception {
assert (initialHandle instanceof IncrementalRemoteKeyedStateHandle);
// 1. Restore base DB from selected initial handle
restoreFromRemoteState((IncrementalRemoteKeyedStateHandle) initialHandle);
// 2. Clip the base DB instance
try {
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
db,
columnFamilyHandles,
keyGroupRange,
initialHandle.getKeyGroupRange(),
keyGroupPrefixBytes);
} catch (RocksDBException e) {
String errMsg = "Failed to clip DB after initialization.";
LOG.error(errMsg, e);
throw new BackendBuildingException(errMsg, e);
}
}
示例3
private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final long key) throws BackendBuildingException {
final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
ExecutionConfig executionConfig = new ExecutionConfig();
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
new HeapKeyedStateBackendBuilder<>(
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
keyGroupRange.getNumberOfKeyGroups(),
keyGroupRange,
executionConfig,
TtlTimeProvider.DEFAULT,
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
TestLocalRecoveryConfig.disabled(),
new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
async,
new CloseableRegistry()).build();
longHeapKeyedStateBackend.setCurrentKey(key);
return longHeapKeyedStateBackend;
}
示例4
private static OperatorStateBackend createOperatorStateBackend(
RuntimeContext runtimeContext,
Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry){
try {
return new DefaultOperatorStateBackendBuilder(
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getExecutionConfig(),
false,
stateHandles,
cancelStreamRegistry
).build();
} catch (BackendBuildingException e) {
throw new RuntimeException(e);
}
}
示例5
/**
* Recovery from a single remote incremental state without rescaling.
*/
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
(IncrementalLocalKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
restoreFromLocalState(incrementalLocalKeyedStateHandle);
} else {
throw new BackendBuildingException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
", but found " + keyedStateHandle.getClass());
}
}
示例6
private void initDBWithRescaling(KeyedStateHandle initialHandle) throws Exception {
assert (initialHandle instanceof IncrementalRemoteKeyedStateHandle);
// 1. Restore base DB from selected initial handle
restoreFromRemoteState((IncrementalRemoteKeyedStateHandle) initialHandle);
// 2. Clip the base DB instance
try {
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
db,
columnFamilyHandles,
keyGroupRange,
initialHandle.getKeyGroupRange(),
keyGroupPrefixBytes);
} catch (RocksDBException e) {
String errMsg = "Failed to clip DB after initialization.";
LOG.error(errMsg, e);
throw new BackendBuildingException(errMsg, e);
}
}
示例7
private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final long key) throws BackendBuildingException {
final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
ExecutionConfig executionConfig = new ExecutionConfig();
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
new HeapKeyedStateBackendBuilder<>(
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
keyGroupRange.getNumberOfKeyGroups(),
keyGroupRange,
executionConfig,
TtlTimeProvider.DEFAULT,
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
TestLocalRecoveryConfig.disabled(),
new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
async,
new CloseableRegistry()).build();
longHeapKeyedStateBackend.setCurrentKey(key);
return longHeapKeyedStateBackend;
}
示例8
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
if (operatorIdentifier.contains(DECLINE_SINK_NAME)) {
return new DeclineSinkFailingOperatorStateBackend(
env.getExecutionConfig(),
cancelStreamRegistry,
new DeclineSinkFailingSnapshotStrategy());
} else {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
false,
stateHandles,
cancelStreamRegistry).build();
}
}
示例9
private static OperatorStateBackend createOperatorStateBackend(
RuntimeContext runtimeContext,
Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry){
try {
return new DefaultOperatorStateBackendBuilder(
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getExecutionConfig(),
false,
stateHandles,
cancelStreamRegistry
).build();
} catch (BackendBuildingException e) {
throw new RuntimeException(e);
}
}
示例10
/**
* Recovery from a single remote incremental state without rescaling.
*/
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
(IncrementalLocalKeyedStateHandle) keyedStateHandle;
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
restoreFromLocalState(incrementalLocalKeyedStateHandle);
} else {
throw new BackendBuildingException("Unexpected state handle type, " +
"expected " + IncrementalRemoteKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
", but found " + keyedStateHandle.getClass());
}
}
示例11
private void initDBWithRescaling(KeyedStateHandle initialHandle) throws Exception {
assert (initialHandle instanceof IncrementalRemoteKeyedStateHandle);
// 1. Restore base DB from selected initial handle
restoreFromRemoteState((IncrementalRemoteKeyedStateHandle) initialHandle);
// 2. Clip the base DB instance
try {
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
db,
columnFamilyHandles,
keyGroupRange,
initialHandle.getKeyGroupRange(),
keyGroupPrefixBytes,
writeBatchSize);
} catch (RocksDBException e) {
String errMsg = "Failed to clip DB after initialization.";
LOG.error(errMsg, e);
throw new BackendBuildingException(errMsg, e);
}
}
示例12
private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(final long key) throws BackendBuildingException {
final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
ExecutionConfig executionConfig = new ExecutionConfig();
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
new HeapKeyedStateBackendBuilder<>(
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
keyGroupRange.getNumberOfKeyGroups(),
keyGroupRange,
executionConfig,
TtlTimeProvider.DEFAULT,
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
TestLocalRecoveryConfig.disabled(),
new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
async,
new CloseableRegistry()).build();
longHeapKeyedStateBackend.setCurrentKey(key);
return longHeapKeyedStateBackend;
}
示例13
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
TaskStateManager taskStateManager = env.getTaskStateManager();
LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackendBuilder<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
ttlTimeProvider,
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
localRecoveryConfig,
priorityQueueSetFactory,
isUsingAsynchronousSnapshots(),
cancelStreamRegistry).build();
}
示例14
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots(),
stateHandles,
cancelStreamRegistry).build();
}
示例15
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
TaskStateManager taskStateManager = env.getTaskStateManager();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackendBuilder<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
ttlTimeProvider,
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
taskStateManager.createLocalRecoveryConfig(),
priorityQueueSetFactory,
isUsingAsynchronousSnapshots(),
cancelStreamRegistry).build();
}
示例16
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
TaskStateManager taskStateManager = env.getTaskStateManager();
LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackendBuilder<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
ttlTimeProvider,
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
localRecoveryConfig,
priorityQueueSetFactory,
isUsingAsynchronousSnapshots(),
cancelStreamRegistry).build();
}
示例17
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots(),
stateHandles,
cancelStreamRegistry).build();
}
示例18
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
TaskStateManager taskStateManager = env.getTaskStateManager();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackendBuilder<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
ttlTimeProvider,
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
taskStateManager.createLocalRecoveryConfig(),
priorityQueueSetFactory,
isUsingAsynchronousSnapshots(),
cancelStreamRegistry).build();
}
示例19
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
TaskStateManager taskStateManager = env.getTaskStateManager();
LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackendBuilder<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
ttlTimeProvider,
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
localRecoveryConfig,
priorityQueueSetFactory,
isUsingAsynchronousSnapshots(),
cancelStreamRegistry).build();
}
示例20
@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots(),
stateHandles,
cancelStreamRegistry).build();
}
示例21
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
TaskStateManager taskStateManager = env.getTaskStateManager();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackendBuilder<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
ttlTimeProvider,
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
taskStateManager.createLocalRecoveryConfig(),
priorityQueueSetFactory,
isUsingAsynchronousSnapshots(),
cancelStreamRegistry).build();
}
示例22
@Override
public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
// Map of registered Key/Value states
Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
// Map of registered priority queue set states
Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates = new HashMap<>();
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
HeapSnapshotStrategy<K> snapshotStrategy = initSnapshotStrategy(
asynchronousSnapshots, registeredKVStates, registeredPQStates, cancelStreamRegistryForBackend);
HeapKeyedStateBackend<K> backend = new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializerProvider,
userCodeClassLoader,
numberOfKeyGroups,
keyGroupRange,
executionConfig,
ttlTimeProvider,
cancelStreamRegistryForBackend,
keyGroupCompressionDecorator,
registeredKVStates,
registeredPQStates,
localRecoveryConfig,
priorityQueueSetFactory,
snapshotStrategy
);
HeapRestoreOperation<K> restoreOperation = new HeapRestoreOperation<>(
restoreStateHandles,
keySerializerProvider,
userCodeClassLoader,
registeredKVStates,
registeredPQStates,
cancelStreamRegistry,
priorityQueueSetFactory,
keyGroupRange,
numberOfKeyGroups,
snapshotStrategy,
backend);
try {
restoreOperation.restore();
} catch (Exception e) {
backend.dispose();
throw new BackendBuildingException("Failed when trying to restore heap backend", e);
}
return backend;
}
示例23
@Override
public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
// Map of registered Key/Value states
Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
// Map of registered priority queue set states
Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates = new HashMap<>();
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
HeapSnapshotStrategy<K> snapshotStrategy = initSnapshotStrategy(
asynchronousSnapshots, registeredKVStates, registeredPQStates, cancelStreamRegistryForBackend);
InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
keyGroupRange,
numberOfKeyGroups
);
HeapRestoreOperation<K> restoreOperation = new HeapRestoreOperation<>(
restoreStateHandles,
keySerializerProvider,
userCodeClassLoader,
registeredKVStates,
registeredPQStates,
cancelStreamRegistry,
priorityQueueSetFactory,
keyGroupRange,
numberOfKeyGroups,
snapshotStrategy,
keyContext);
try {
restoreOperation.restore();
} catch (Exception e) {
throw new BackendBuildingException("Failed when trying to restore heap backend", e);
}
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializerProvider.currentSchemaSerializer(),
userCodeClassLoader,
executionConfig,
ttlTimeProvider,
cancelStreamRegistryForBackend,
keyGroupCompressionDecorator,
registeredKVStates,
registeredPQStates,
localRecoveryConfig,
priorityQueueSetFactory,
snapshotStrategy,
keyContext);
}
示例24
@Override
public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
// Map of registered Key/Value states
Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
// Map of registered priority queue set states
Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates = new HashMap<>();
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
HeapSnapshotStrategy<K> snapshotStrategy = initSnapshotStrategy(
asynchronousSnapshots, registeredKVStates, registeredPQStates, cancelStreamRegistryForBackend);
InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
keyGroupRange,
numberOfKeyGroups
);
HeapRestoreOperation<K> restoreOperation = new HeapRestoreOperation<>(
restoreStateHandles,
keySerializerProvider,
userCodeClassLoader,
registeredKVStates,
registeredPQStates,
cancelStreamRegistry,
priorityQueueSetFactory,
keyGroupRange,
numberOfKeyGroups,
snapshotStrategy,
keyContext);
try {
restoreOperation.restore();
} catch (Exception e) {
throw new BackendBuildingException("Failed when trying to restore heap backend", e);
}
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializerProvider.currentSchemaSerializer(),
userCodeClassLoader,
executionConfig,
ttlTimeProvider,
cancelStreamRegistryForBackend,
keyGroupCompressionDecorator,
registeredKVStates,
registeredPQStates,
localRecoveryConfig,
priorityQueueSetFactory,
snapshotStrategy,
keyContext);
}