Java源码示例:org.apache.flink.runtime.state.CheckpointedStateScope

示例1
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
	long checkpointId,
	CheckpointStreamFactory primaryStreamFactory,
	CheckpointOptions checkpointOptions) {

	return localRecoveryConfig.isLocalRecoveryEnabled() &&
		(CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?

		() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
			checkpointId,
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory,
			localRecoveryConfig.getLocalStateDirectoryProvider()) :

		() -> CheckpointStreamWithResultProvider.createSimpleStream(
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory);
}
 
示例2
/**
 * Test that the exception arose in the thread pool will rethrow to the main thread.
 */
@Test
public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException {
	SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states.");

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream = createFailingCheckpointStateOutputStream(expectedException);
	CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream;

	File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
	generateRandomFileContent(file.getPath(), 20);

	Map<StateHandleID, Path> filePaths = new HashMap<>(1);
	filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath()));
	try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
		rocksDBStateUploader.uploadFilesToCheckpointFs(filePaths, checkpointStreamFactory, new CloseableRegistry());
		fail();
	} catch (Exception e) {
		assertEquals(expectedException, e);
	}
}
 
示例3
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
	long checkpointId,
	CheckpointStreamFactory primaryStreamFactory,
	CheckpointOptions checkpointOptions) {

	return localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ?

		() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
			checkpointId,
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory,
			localRecoveryConfig.getLocalStateDirectoryProvider()) :

		() -> CheckpointStreamWithResultProvider.createSimpleStream(
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory);
}
 
示例4
/**
 * Test that the exception arose in the thread pool will rethrow to the main thread.
 */
@Test
public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException {
	SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states.");

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream = createFailingCheckpointStateOutputStream(expectedException);
	CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream;

	File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
	generateRandomFileContent(file.getPath(), 20);

	Map<StateHandleID, Path> filePaths = new HashMap<>(1);
	filePaths.put(new StateHandleID("mockHandleID"), new Path(file.getPath()));
	try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
		rocksDBStateUploader.uploadFilesToCheckpointFs(filePaths, checkpointStreamFactory, new CloseableRegistry());
		fail();
	} catch (Exception e) {
		assertEquals(expectedException, e);
	}
}
 
示例5
private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(
	long checkpointId,
	CheckpointStreamFactory primaryStreamFactory,
	CheckpointOptions checkpointOptions) {

	return localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ?

		() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
			checkpointId,
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory,
			localRecoveryConfig.getLocalStateDirectoryProvider()) :

		() -> CheckpointStreamWithResultProvider.createSimpleStream(
			CheckpointedStateScope.EXCLUSIVE,
			primaryStreamFactory);
}
 
示例6
/**
 * Test that the exception arose in the thread pool will rethrow to the main thread.
 */
@Test
public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException {
	SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread upload states.");

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream = createFailingCheckpointStateOutputStream(expectedException);
	CheckpointStreamFactory checkpointStreamFactory = (CheckpointedStateScope scope) -> outputStream;

	File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
	generateRandomFileContent(file.getPath(), 20);

	Map<StateHandleID, Path> filePaths = new HashMap<>(1);
	filePaths.put(new StateHandleID("mockHandleID"), file.toPath());
	try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
		rocksDBStateUploader.uploadFilesToCheckpointFs(filePaths, checkpointStreamFactory, new CloseableRegistry());
		fail();
	} catch (Exception e) {
		assertEquals(expectedException, e);
	}
}
 
示例7
@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
	Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
	int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);

	return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);
}
 
示例8
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
	CheckpointedStateScope scope) throws IOException {

	BlockingCheckpointOutputStream blockingStream = new BlockingCheckpointOutputStream(
		new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize),
		waiter,
		blocker,
		afterNumberInvocations);

	allCreatedStreams.add(blockingStream);

	return blockingStream;
}
 
示例9
/**
 * Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated
 * output streams.
 */
@Test
public void testStreamClosingWhenClosing() throws Exception {
	long checkpointId = 42L;
	long checkpointTimestamp = 1L;

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
	CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);

	CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
	when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);

	InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();

	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);

	StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
		checkpointId,
		checkpointTimestamp,
		streamFactory,
		keyGroupRange,
		closableRegistry);

	// creating the output streams
	context.getRawKeyedOperatorStateOutput();
	context.getRawOperatorStateOutput();

	verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

	assertEquals(2, closableRegistry.size());
	assertTrue(closableRegistry.contains(outputStream1));
	assertTrue(closableRegistry.contains(outputStream2));

	context.close();

	verify(outputStream1).close();
	verify(outputStream2).close();

	assertEquals(0, closableRegistry.size());
}
 
示例10
@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
	Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
	int bufferSize = Math.max(writeBufferSize, fileStateThreshold);

	return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);
}
 
示例11
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
	CheckpointedStateScope scope) throws IOException {

	BlockingCheckpointOutputStream blockingStream = new BlockingCheckpointOutputStream(
		new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize),
		waiter,
		blocker,
		afterNumberInvocations);

	allCreatedStreams.add(blockingStream);

	return blockingStream;
}
 
示例12
@Test
public void testStreamClosingExceptionally() throws Exception {
	long checkpointId = 42L;
	long checkpointTimestamp = 1L;

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
	CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);

	CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
	when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);

	InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();

	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);

	StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
		checkpointId,
		checkpointTimestamp,
		streamFactory,
		keyGroupRange,
		closableRegistry);

	// creating the output streams
	context.getRawKeyedOperatorStateOutput();
	context.getRawOperatorStateOutput();

	verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

	assertEquals(2, closableRegistry.size());
	assertTrue(closableRegistry.contains(outputStream1));
	assertTrue(closableRegistry.contains(outputStream2));

	context.closeExceptionally();

	verify(outputStream1).close();
	verify(outputStream2).close();

	assertEquals(0, closableRegistry.size());
}
 
示例13
private StateObjectCollection<OperatorStateHandle> transformSubtaskOpState(Path outDir, Integer subtaskId,
		StateObjectCollection<OperatorStateHandle> baseState) {

	if (transformer == null) {
		return baseState;
	}

	StateObjectCollection<OperatorStateHandle> opHandle = baseState;
	try (OperatorStateBackend opBackend = OperatorStateReader
			.restoreOperatorStateBackend(opHandle)) {

		transformer.accept(subtaskId, opBackend);

		OperatorStateHandle newSnapshot = opBackend
				.snapshot(checkpointId, System.currentTimeMillis(), new CheckpointStreamFactory() {
					@Override
					public CheckpointStateOutputStream createCheckpointStateOutputStream(
							CheckpointedStateScope scope)
							throws IOException {
						return new FileBasedStateOutputStream(outDir.getFileSystem(),
								new Path(outDir, String.valueOf(UUID.randomUUID())));
					}
				}, null).get().getJobManagerOwnedSnapshot();
		return new StateObjectCollection<>(Lists.newArrayList(newSnapshot));
	} catch (Exception e) {
		throw new RuntimeException(e);
	}
}
 
示例14
@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
	Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
	int bufferSize = Math.max(writeBufferSize, fileStateThreshold);

	final boolean absolutePath = entropyInjecting || scope == CheckpointedStateScope.SHARED;
	return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold, !absolutePath);
}
 
示例15
@Test
@SuppressWarnings("ConstantConditions")
public void testWriteFlushesIfAboveThreshold() throws IOException {
	int fileSizeThreshold = 100;
	final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem(), fileSizeThreshold, fileSizeThreshold);
	final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
	stream.write(new byte[fileSizeThreshold]);
	File[] files = new File(exclusiveStateDir.toUri()).listFiles();
	assertEquals(1, files.length);
	File file = files[0];
	assertEquals(fileSizeThreshold, file.length());
	stream.write(new byte[fileSizeThreshold - 1]); // should buffer without flushing
	stream.write(127); // should buffer without flushing
	assertEquals(fileSizeThreshold, file.length());
}
 
示例16
@Test
public void testExclusiveStateHasRelativePathHandles() throws IOException {
	final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem(), 0);

	final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
			factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
	stream.write(1657);
	final StreamStateHandle handle = stream.closeAndGetHandle();

	assertThat(handle, instanceOf(RelativeFileStateHandle.class));
	assertPathsEqual(exclusiveStateDir, ((RelativeFileStateHandle) handle).getFilePath().getParent());
}
 
示例17
@Test
public void testSharedStateHasAbsolutePathHandles() throws IOException {
	final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem(), 0);

	final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
		factory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
	stream.write(0);
	final StreamStateHandle handle = stream.closeAndGetHandle();

	assertThat(handle, instanceOf(FileStateHandle.class));
	assertThat(handle, not(instanceOf(RelativeFileStateHandle.class)));
	assertPathsEqual(sharedStateDir, ((FileStateHandle) handle).getFilePath().getParent());
}
 
示例18
@Test
public void testEntropyMakesExclusiveStateAbsolutePaths() throws IOException{
	final FsCheckpointStreamFactory factory = createFactory(new FsStateBackendEntropyTest.TestEntropyAwareFs(), 0);

	final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
		factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
	stream.write(0);
	final StreamStateHandle handle = stream.closeAndGetHandle();

	assertThat(handle, instanceOf(FileStateHandle.class));
	assertThat(handle, not(instanceOf(RelativeFileStateHandle.class)));
	assertPathsEqual(exclusiveStateDir, ((FileStateHandle) handle).getFilePath().getParent());
}
 
示例19
private void flushAndVerify(int minFileSize, int bytesToFlush, boolean expectEmpty) throws IOException {
	FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
			createFactory(new FsStateBackendEntropyTest.TestEntropyAwareFs(), minFileSize)
					.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

	stream.write(new byte[bytesToFlush], 0, bytesToFlush);
	stream.flush();
	assertEquals(expectEmpty ? 0 : 1, new File(exclusiveStateDir.toUri()).listFiles().length);
}
 
示例20
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
	CheckpointedStateScope scope) throws IOException {

	BlockingCheckpointOutputStream blockingStream = new BlockingCheckpointOutputStream(
		new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize),
		waiter,
		blocker,
		afterNumberInvocations);

	allCreatedStreams.add(blockingStream);

	return blockingStream;
}
 
示例21
@Test
public void testStreamClosingExceptionally() throws Exception {
	long checkpointId = 42L;
	long checkpointTimestamp = 1L;

	CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
	CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);

	CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
	when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);

	InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();

	KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);

	StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
		checkpointId,
		checkpointTimestamp,
		streamFactory,
		keyGroupRange,
		closableRegistry);

	// creating the output streams
	context.getRawKeyedOperatorStateOutput();
	context.getRawOperatorStateOutput();

	verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

	assertEquals(2, closableRegistry.size());
	assertTrue(closableRegistry.contains(outputStream1));
	assertTrue(closableRegistry.contains(outputStream2));

	context.closeExceptionally();

	verify(outputStream1).close();
	verify(outputStream2).close();

	assertEquals(0, closableRegistry.size());
}
 
示例22
private StreamStateHandle uploadLocalFileToCheckpointFs(
	Path filePath,
	CheckpointStreamFactory checkpointStreamFactory,
	CloseableRegistry closeableRegistry) throws IOException {
	FSDataInputStream inputStream = null;
	CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;

	try {
		final byte[] buffer = new byte[READ_BUFFER_SIZE];

		FileSystem backupFileSystem = filePath.getFileSystem();
		inputStream = backupFileSystem.open(filePath);
		closeableRegistry.registerCloseable(inputStream);

		outputStream = checkpointStreamFactory
			.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
		closeableRegistry.registerCloseable(outputStream);

		while (true) {
			int numBytes = inputStream.read(buffer);

			if (numBytes == -1) {
				break;
			}

			outputStream.write(buffer, 0, numBytes);
		}

		StreamStateHandle result = null;
		if (closeableRegistry.unregisterCloseable(outputStream)) {
			result = outputStream.closeAndGetHandle();
			outputStream = null;
		}
		return result;

	} finally {

		if (closeableRegistry.unregisterCloseable(inputStream)) {
			IOUtils.closeQuietly(inputStream);
		}

		if (closeableRegistry.unregisterCloseable(outputStream)) {
			IOUtils.closeQuietly(outputStream);
		}
	}
}
 
示例23
@Nonnull
private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {

	CheckpointStreamWithResultProvider streamWithResultProvider =

		localRecoveryConfig.isLocalRecoveryEnabled() ?

			CheckpointStreamWithResultProvider.createDuplicatingStream(
				checkpointId,
				CheckpointedStateScope.EXCLUSIVE,
				checkpointStreamFactory,
				localRecoveryConfig.getLocalStateDirectoryProvider()) :

			CheckpointStreamWithResultProvider.createSimpleStream(
				CheckpointedStateScope.EXCLUSIVE,
				checkpointStreamFactory);

	snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);

	try {
		//no need for compression scheme support because sst-files are already compressed
		KeyedBackendSerializationProxy<K> serializationProxy =
			new KeyedBackendSerializationProxy<>(
				keySerializer,
				stateMetaInfoSnapshots,
				false);

		DataOutputView out =
			new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());

		serializationProxy.write(out);

		if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
			SnapshotResult<StreamStateHandle> result =
				streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
			streamWithResultProvider = null;
			return result;
		} else {
			throw new IOException("Stream already closed and cannot return a handle.");
		}
	} finally {
		if (streamWithResultProvider != null) {
			if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
				IOUtils.closeQuietly(streamWithResultProvider);
			}
		}
	}
}
 
示例24
@Override
public CheckpointStateOutputStream get() throws IOException {
	return factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
}
 
示例25
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
		CheckpointedStateScope scope) throws IOException
{
	return new MemoryCheckpointOutputStream(maxStateSize);
}
 
示例26
@Test
public void testDirectoriesForExclusiveAndSharedState() throws Exception {
	final FileSystem fs = LocalFileSystem.getSharedInstance();
	final Path checkpointDir = randomTempPath();
	final Path sharedStateDir = randomTempPath();

	FsCheckpointStorageLocation storageLocation = new FsCheckpointStorageLocation(
			fs,
			checkpointDir,
			sharedStateDir,
			randomTempPath(),
			CheckpointStorageLocationReference.getDefault(),
			FILE_SIZE_THRESHOLD);

	assertNotEquals(storageLocation.getCheckpointDirectory(), storageLocation.getSharedStateDirectory());

	assertEquals(0, fs.listStatus(storageLocation.getCheckpointDirectory()).length);
	assertEquals(0, fs.listStatus(storageLocation.getSharedStateDirectory()).length);

	// create exclusive state

	CheckpointStateOutputStream exclusiveStream =
			storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

	exclusiveStream.write(42);
	exclusiveStream.flush();
	StreamStateHandle exclusiveHandle = exclusiveStream.closeAndGetHandle();

	assertEquals(1, fs.listStatus(storageLocation.getCheckpointDirectory()).length);
	assertEquals(0, fs.listStatus(storageLocation.getSharedStateDirectory()).length);

	// create shared state

	CheckpointStateOutputStream sharedStream =
			storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);

	sharedStream.write(42);
	sharedStream.flush();
	StreamStateHandle sharedHandle = sharedStream.closeAndGetHandle();

	assertEquals(1, fs.listStatus(storageLocation.getCheckpointDirectory()).length);
	assertEquals(1, fs.listStatus(storageLocation.getSharedStateDirectory()).length);

	// drop state

	exclusiveHandle.discardState();
	sharedHandle.discardState();
}
 
示例27
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) {
	return supplier.get();
}
 
示例28
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
	return streamFactory.get();
}
 
示例29
private StreamStateHandle uploadLocalFileToCheckpointFs(
	Path filePath,
	CheckpointStreamFactory checkpointStreamFactory,
	CloseableRegistry closeableRegistry) throws IOException {
	FSDataInputStream inputStream = null;
	CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;

	try {
		final byte[] buffer = new byte[READ_BUFFER_SIZE];

		FileSystem backupFileSystem = filePath.getFileSystem();
		inputStream = backupFileSystem.open(filePath);
		closeableRegistry.registerCloseable(inputStream);

		outputStream = checkpointStreamFactory
			.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
		closeableRegistry.registerCloseable(outputStream);

		while (true) {
			int numBytes = inputStream.read(buffer);

			if (numBytes == -1) {
				break;
			}

			outputStream.write(buffer, 0, numBytes);
		}

		StreamStateHandle result = null;
		if (closeableRegistry.unregisterCloseable(outputStream)) {
			result = outputStream.closeAndGetHandle();
			outputStream = null;
		}
		return result;

	} finally {

		if (closeableRegistry.unregisterCloseable(inputStream)) {
			IOUtils.closeQuietly(inputStream);
		}

		if (closeableRegistry.unregisterCloseable(outputStream)) {
			IOUtils.closeQuietly(outputStream);
		}
	}
}
 
示例30
@Nonnull
private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {

	CheckpointStreamWithResultProvider streamWithResultProvider =

		localRecoveryConfig.isLocalRecoveryEnabled() ?

			CheckpointStreamWithResultProvider.createDuplicatingStream(
				checkpointId,
				CheckpointedStateScope.EXCLUSIVE,
				checkpointStreamFactory,
				localRecoveryConfig.getLocalStateDirectoryProvider()) :

			CheckpointStreamWithResultProvider.createSimpleStream(
				CheckpointedStateScope.EXCLUSIVE,
				checkpointStreamFactory);

	snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);

	try {
		//no need for compression scheme support because sst-files are already compressed
		KeyedBackendSerializationProxy<K> serializationProxy =
			new KeyedBackendSerializationProxy<>(
				keySerializer,
				stateMetaInfoSnapshots,
				false);

		DataOutputView out =
			new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());

		serializationProxy.write(out);

		if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
			SnapshotResult<StreamStateHandle> result =
				streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
			streamWithResultProvider = null;
			return result;
		} else {
			throw new IOException("Stream already closed and cannot return a handle.");
		}
	} finally {
		if (streamWithResultProvider != null) {
			if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
				IOUtils.closeQuietly(streamWithResultProvider);
			}
		}
	}
}