Java源码示例:org.apache.flink.runtime.state.CheckpointedStateScope
示例1
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
long checkpointId,
CheckpointStreamFactory primaryStreamFactory,
CheckpointOptions checkpointOptions) {
return localRecoveryConfig.isLocalRecoveryEnabled() &&
(CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
() -> CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory);
}
示例2
/**
* Test that the exception arose in the thread pool will rethrow to the main thread.
*/
@Test
public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException {
SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states.");
CheckpointStreamFactory.CheckpointStateOutputStream outputStream = createFailingCheckpointStateOutputStream(expectedException);
CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream;
File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
generateRandomFileContent(file.getPath(), 20);
Map<StateHandleID, Path> filePaths = new HashMap<>(1);
filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath()));
try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
rocksDBStateUploader.uploadFilesToCheckpointFs(filePaths, checkpointStreamFactory, new CloseableRegistry());
fail();
} catch (Exception e) {
assertEquals(expectedException, e);
}
}
示例3
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
long checkpointId,
CheckpointStreamFactory primaryStreamFactory,
CheckpointOptions checkpointOptions) {
return localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
() -> CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory);
}
示例4
/**
* Test that the exception arose in the thread pool will rethrow to the main thread.
*/
@Test
public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException {
SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states.");
CheckpointStreamFactory.CheckpointStateOutputStream outputStream = createFailingCheckpointStateOutputStream(expectedException);
CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream;
File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
generateRandomFileContent(file.getPath(), 20);
Map<StateHandleID, Path> filePaths = new HashMap<>(1);
filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath()));
try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
rocksDBStateUploader.uploadFilesToCheckpointFs(filePaths, checkpointStreamFactory, new CloseableRegistry());
fail();
} catch (Exception e) {
assertEquals(expectedException, e);
}
}
示例5
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
long checkpointId,
CheckpointStreamFactory primaryStreamFactory,
CheckpointOptions checkpointOptions) {
return localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
() -> CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory);
}
示例6
/**
* Test that the exception arose in the thread pool will rethrow to the main thread.
*/
@Test
public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException {
SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states.");
CheckpointStreamFactory.CheckpointStateOutputStream outputStream = createFailingCheckpointStateOutputStream(expectedException);
CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream;
File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
generateRandomFileContent(file.getPath(), 20);
Map<StateHandleID, Path> filePaths = new HashMap<>(1);
filePaths.put(new StateHandleID("mockHandleID"), file.toPath());
try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
rocksDBStateUploader.uploadFilesToCheckpointFs(filePaths, checkpointStreamFactory, new CloseableRegistry());
fail();
} catch (Exception e) {
assertEquals(expectedException, e);
}
}
示例7
@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);
}
示例8
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
CheckpointedStateScope scope) throws IOException {
BlockingCheckpointOutputStream blockingStream = new BlockingCheckpointOutputStream(
new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize),
waiter,
blocker,
afterNumberInvocations);
allCreatedStreams.add(blockingStream);
return blockingStream;
}
示例9
/**
* Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated
* output streams.
*/
@Test
public void testStreamClosingWhenClosing() throws Exception {
long checkpointId = 42L;
long checkpointTimestamp = 1L;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
checkpointId,
checkpointTimestamp,
streamFactory,
keyGroupRange,
closableRegistry);
// creating the output streams
context.getRawKeyedOperatorStateOutput();
context.getRawOperatorStateOutput();
verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
assertEquals(2, closableRegistry.size());
assertTrue(closableRegistry.contains(outputStream1));
assertTrue(closableRegistry.contains(outputStream2));
context.close();
verify(outputStream1).close();
verify(outputStream2).close();
assertEquals(0, closableRegistry.size());
}
示例10
@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
int bufferSize = Math.max(writeBufferSize, fileStateThreshold);
return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);
}
示例11
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
CheckpointedStateScope scope) throws IOException {
BlockingCheckpointOutputStream blockingStream = new BlockingCheckpointOutputStream(
new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize),
waiter,
blocker,
afterNumberInvocations);
allCreatedStreams.add(blockingStream);
return blockingStream;
}
示例12
@Test
public void testStreamClosingExceptionally() throws Exception {
long checkpointId = 42L;
long checkpointTimestamp = 1L;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
checkpointId,
checkpointTimestamp,
streamFactory,
keyGroupRange,
closableRegistry);
// creating the output streams
context.getRawKeyedOperatorStateOutput();
context.getRawOperatorStateOutput();
verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
assertEquals(2, closableRegistry.size());
assertTrue(closableRegistry.contains(outputStream1));
assertTrue(closableRegistry.contains(outputStream2));
context.closeExceptionally();
verify(outputStream1).close();
verify(outputStream2).close();
assertEquals(0, closableRegistry.size());
}
示例13
private StateObjectCollection<OperatorStateHandle> transformSubtaskOpState(Path outDir, Integer subtaskId,
StateObjectCollection<OperatorStateHandle> baseState) {
if (transformer == null) {
return baseState;
}
StateObjectCollection<OperatorStateHandle> opHandle = baseState;
try (OperatorStateBackend opBackend = OperatorStateReader
.restoreOperatorStateBackend(opHandle)) {
transformer.accept(subtaskId, opBackend);
OperatorStateHandle newSnapshot = opBackend
.snapshot(checkpointId, System.currentTimeMillis(), new CheckpointStreamFactory() {
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
CheckpointedStateScope scope)
throws IOException {
return new FileBasedStateOutputStream(outDir.getFileSystem(),
new Path(outDir, String.valueOf(UUID.randomUUID())));
}
}, null).get().getJobManagerOwnedSnapshot();
return new StateObjectCollection<>(Lists.newArrayList(newSnapshot));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
示例14
@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
int bufferSize = Math.max(writeBufferSize, fileStateThreshold);
final boolean absolutePath = entropyInjecting || scope == CheckpointedStateScope.SHARED;
return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold, !absolutePath);
}
示例15
@Test
@SuppressWarnings("ConstantConditions")
public void testWriteFlushesIfAboveThreshold() throws IOException {
int fileSizeThreshold = 100;
final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem(), fileSizeThreshold, fileSizeThreshold);
final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
stream.write(new byte[fileSizeThreshold]);
File[] files = new File(exclusiveStateDir.toUri()).listFiles();
assertEquals(1, files.length);
File file = files[0];
assertEquals(fileSizeThreshold, file.length());
stream.write(new byte[fileSizeThreshold - 1]); // should buffer without flushing
stream.write(127); // should buffer without flushing
assertEquals(fileSizeThreshold, file.length());
}
示例16
@Test
public void testExclusiveStateHasRelativePathHandles() throws IOException {
final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem(), 0);
final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
stream.write(1657);
final StreamStateHandle handle = stream.closeAndGetHandle();
assertThat(handle, instanceOf(RelativeFileStateHandle.class));
assertPathsEqual(exclusiveStateDir, ((RelativeFileStateHandle) handle).getFilePath().getParent());
}
示例17
@Test
public void testSharedStateHasAbsolutePathHandles() throws IOException {
final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem(), 0);
final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
factory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
stream.write(0);
final StreamStateHandle handle = stream.closeAndGetHandle();
assertThat(handle, instanceOf(FileStateHandle.class));
assertThat(handle, not(instanceOf(RelativeFileStateHandle.class)));
assertPathsEqual(sharedStateDir, ((FileStateHandle) handle).getFilePath().getParent());
}
示例18
@Test
public void testEntropyMakesExclusiveStateAbsolutePaths() throws IOException{
final FsCheckpointStreamFactory factory = createFactory(new FsStateBackendEntropyTest.TestEntropyAwareFs(), 0);
final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
stream.write(0);
final StreamStateHandle handle = stream.closeAndGetHandle();
assertThat(handle, instanceOf(FileStateHandle.class));
assertThat(handle, not(instanceOf(RelativeFileStateHandle.class)));
assertPathsEqual(exclusiveStateDir, ((FileStateHandle) handle).getFilePath().getParent());
}
示例19
private void flushAndVerify(int minFileSize, int bytesToFlush, boolean expectEmpty) throws IOException {
FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
createFactory(new FsStateBackendEntropyTest.TestEntropyAwareFs(), minFileSize)
.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
stream.write(new byte[bytesToFlush], 0, bytesToFlush);
stream.flush();
assertEquals(expectEmpty ? 0 : 1, new File(exclusiveStateDir.toUri()).listFiles().length);
}
示例20
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
CheckpointedStateScope scope) throws IOException {
BlockingCheckpointOutputStream blockingStream = new BlockingCheckpointOutputStream(
new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize),
waiter,
blocker,
afterNumberInvocations);
allCreatedStreams.add(blockingStream);
return blockingStream;
}
示例21
@Test
public void testStreamClosingExceptionally() throws Exception {
long checkpointId = 42L;
long checkpointTimestamp = 1L;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
checkpointId,
checkpointTimestamp,
streamFactory,
keyGroupRange,
closableRegistry);
// creating the output streams
context.getRawKeyedOperatorStateOutput();
context.getRawOperatorStateOutput();
verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
assertEquals(2, closableRegistry.size());
assertTrue(closableRegistry.contains(outputStream1));
assertTrue(closableRegistry.contains(outputStream2));
context.closeExceptionally();
verify(outputStream1).close();
verify(outputStream2).close();
assertEquals(0, closableRegistry.size());
}
示例22
private StreamStateHandle uploadLocalFileToCheckpointFs(
Path filePath,
CheckpointStreamFactory checkpointStreamFactory,
CloseableRegistry closeableRegistry) throws IOException {
FSDataInputStream inputStream = null;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
try {
final byte[] buffer = new byte[READ_BUFFER_SIZE];
FileSystem backupFileSystem = filePath.getFileSystem();
inputStream = backupFileSystem.open(filePath);
closeableRegistry.registerCloseable(inputStream);
outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
closeableRegistry.registerCloseable(outputStream);
while (true) {
int numBytes = inputStream.read(buffer);
if (numBytes == -1) {
break;
}
outputStream.write(buffer, 0, numBytes);
}
StreamStateHandle result = null;
if (closeableRegistry.unregisterCloseable(outputStream)) {
result = outputStream.closeAndGetHandle();
outputStream = null;
}
return result;
} finally {
if (closeableRegistry.unregisterCloseable(inputStream)) {
IOUtils.closeQuietly(inputStream);
}
if (closeableRegistry.unregisterCloseable(outputStream)) {
IOUtils.closeQuietly(outputStream);
}
}
}
示例23
@Nonnull
private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
CheckpointStreamWithResultProvider streamWithResultProvider =
localRecoveryConfig.isLocalRecoveryEnabled() ?
CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
checkpointStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
checkpointStreamFactory);
snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);
try {
//no need for compression scheme support because sst-files are already compressed
KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(
keySerializer,
stateMetaInfoSnapshots,
false);
DataOutputView out =
new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());
serializationProxy.write(out);
if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
SnapshotResult<StreamStateHandle> result =
streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
streamWithResultProvider = null;
return result;
} else {
throw new IOException("Stream already closed and cannot return a handle.");
}
} finally {
if (streamWithResultProvider != null) {
if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
IOUtils.closeQuietly(streamWithResultProvider);
}
}
}
}
示例24
@Override
public CheckpointStateOutputStream get() throws IOException {
return factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
}
示例25
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
CheckpointedStateScope scope) throws IOException
{
return new MemoryCheckpointOutputStream(maxStateSize);
}
示例26
@Test
public void testDirectoriesForExclusiveAndSharedState() throws Exception {
final FileSystem fs = LocalFileSystem.getSharedInstance();
final Path checkpointDir = randomTempPath();
final Path sharedStateDir = randomTempPath();
FsCheckpointStorageLocation storageLocation = new FsCheckpointStorageLocation(
fs,
checkpointDir,
sharedStateDir,
randomTempPath(),
CheckpointStorageLocationReference.getDefault(),
FILE_SIZE_THRESHOLD);
assertNotEquals(storageLocation.getCheckpointDirectory(), storageLocation.getSharedStateDirectory());
assertEquals(0, fs.listStatus(storageLocation.getCheckpointDirectory()).length);
assertEquals(0, fs.listStatus(storageLocation.getSharedStateDirectory()).length);
// create exclusive state
CheckpointStateOutputStream exclusiveStream =
storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
exclusiveStream.write(42);
exclusiveStream.flush();
StreamStateHandle exclusiveHandle = exclusiveStream.closeAndGetHandle();
assertEquals(1, fs.listStatus(storageLocation.getCheckpointDirectory()).length);
assertEquals(0, fs.listStatus(storageLocation.getSharedStateDirectory()).length);
// create shared state
CheckpointStateOutputStream sharedStream =
storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
sharedStream.write(42);
sharedStream.flush();
StreamStateHandle sharedHandle = sharedStream.closeAndGetHandle();
assertEquals(1, fs.listStatus(storageLocation.getCheckpointDirectory()).length);
assertEquals(1, fs.listStatus(storageLocation.getSharedStateDirectory()).length);
// drop state
exclusiveHandle.discardState();
sharedHandle.discardState();
}
示例27
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) {
return supplier.get();
}
示例28
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
return streamFactory.get();
}
示例29
private StreamStateHandle uploadLocalFileToCheckpointFs(
Path filePath,
CheckpointStreamFactory checkpointStreamFactory,
CloseableRegistry closeableRegistry) throws IOException {
FSDataInputStream inputStream = null;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
try {
final byte[] buffer = new byte[READ_BUFFER_SIZE];
FileSystem backupFileSystem = filePath.getFileSystem();
inputStream = backupFileSystem.open(filePath);
closeableRegistry.registerCloseable(inputStream);
outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
closeableRegistry.registerCloseable(outputStream);
while (true) {
int numBytes = inputStream.read(buffer);
if (numBytes == -1) {
break;
}
outputStream.write(buffer, 0, numBytes);
}
StreamStateHandle result = null;
if (closeableRegistry.unregisterCloseable(outputStream)) {
result = outputStream.closeAndGetHandle();
outputStream = null;
}
return result;
} finally {
if (closeableRegistry.unregisterCloseable(inputStream)) {
IOUtils.closeQuietly(inputStream);
}
if (closeableRegistry.unregisterCloseable(outputStream)) {
IOUtils.closeQuietly(outputStream);
}
}
}
示例30
@Nonnull
private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
CheckpointStreamWithResultProvider streamWithResultProvider =
localRecoveryConfig.isLocalRecoveryEnabled() ?
CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
checkpointStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
checkpointStreamFactory);
snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);
try {
//no need for compression scheme support because sst-files are already compressed
KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(
keySerializer,
stateMetaInfoSnapshots,
false);
DataOutputView out =
new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());
serializationProxy.write(out);
if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
SnapshotResult<StreamStateHandle> result =
streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
streamWithResultProvider = null;
return result;
} else {
throw new IOException("Stream already closed and cannot return a handle.");
}
} finally {
if (streamWithResultProvider != null) {
if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
IOUtils.closeQuietly(streamWithResultProvider);
}
}
}
}