Java源码示例:org.apache.flink.util.ShutdownHookUtil
示例1
public TaskExecutorLocalStateStoresManager(
boolean localRecoveryEnabled,
@Nonnull File[] localStateRootDirectories,
@Nonnull Executor discardExecutor) throws IOException {
this.taskStateStoresByAllocationID = new HashMap<>();
this.localRecoveryEnabled = localRecoveryEnabled;
this.localStateRootDirectories = localStateRootDirectories;
this.discardExecutor = discardExecutor;
this.lock = new Object();
this.closed = false;
for (File localStateRecoveryRootDir : localStateRootDirectories) {
if (!localStateRecoveryRootDir.exists()
&& !localStateRecoveryRootDir.mkdirs()
// we double check for exists in case another task created the directory concurrently.
&& !localStateRecoveryRootDir.exists()) {
throw new IOException("Could not create root directory for local recovery: " +
localStateRecoveryRootDir);
}
}
// register a shutdown hook
this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG);
}
示例2
public void shutdown() {
HashMap<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> toRelease;
synchronized (lock) {
if (closed) {
return;
}
closed = true;
toRelease = new HashMap<>(taskStateStoresByAllocationID);
taskStateStoresByAllocationID.clear();
}
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> entry :
toRelease.entrySet()) {
doRelease(entry.getValue().values());
cleanupAllocationBaseDirs(entry.getKey());
}
}
示例3
@Override
public void close() throws IOException {
cancelCleanupTask();
if (shutdownRequested.compareAndSet(false, true)) {
log.info("Shutting down BLOB cache");
// Clean up the storage directory
try {
FileUtils.deleteDirectory(storageDir);
} finally {
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), log);
}
}
}
示例4
public TaskExecutorLocalStateStoresManager(
boolean localRecoveryEnabled,
@Nonnull File[] localStateRootDirectories,
@Nonnull Executor discardExecutor) throws IOException {
this.taskStateStoresByAllocationID = new HashMap<>();
this.localRecoveryEnabled = localRecoveryEnabled;
this.localStateRootDirectories = localStateRootDirectories;
this.discardExecutor = discardExecutor;
this.lock = new Object();
this.closed = false;
for (File localStateRecoveryRootDir : localStateRootDirectories) {
if (!localStateRecoveryRootDir.exists()
&& !localStateRecoveryRootDir.mkdirs()
// we double check for exists in case another task created the directory concurrently.
&& !localStateRecoveryRootDir.exists()) {
throw new IOException("Could not create root directory for local recovery: " +
localStateRecoveryRootDir);
}
}
// register a shutdown hook
this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG);
}
示例5
public void shutdown() {
HashMap<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> toRelease;
synchronized (lock) {
if (closed) {
return;
}
closed = true;
toRelease = new HashMap<>(taskStateStoresByAllocationID);
taskStateStoresByAllocationID.clear();
}
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> entry :
toRelease.entrySet()) {
doRelease(entry.getValue().values());
cleanupAllocationBaseDirs(entry.getKey());
}
}
示例6
@Override
public void close() throws IOException {
cancelCleanupTask();
if (shutdownRequested.compareAndSet(false, true)) {
log.info("Shutting down BLOB cache");
// Clean up the storage directory
try {
FileUtils.deleteDirectory(storageDir);
} finally {
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), log);
}
}
}
示例7
public TaskExecutorLocalStateStoresManager(
boolean localRecoveryEnabled,
@Nonnull File[] localStateRootDirectories,
@Nonnull Executor discardExecutor) throws IOException {
this.taskStateStoresByAllocationID = new HashMap<>();
this.localRecoveryEnabled = localRecoveryEnabled;
this.localStateRootDirectories = localStateRootDirectories;
this.discardExecutor = discardExecutor;
this.lock = new Object();
this.closed = false;
for (File localStateRecoveryRootDir : localStateRootDirectories) {
if (!localStateRecoveryRootDir.exists()
&& !localStateRecoveryRootDir.mkdirs()
// we double check for exists in case another task created the directory concurrently.
&& !localStateRecoveryRootDir.exists()) {
throw new IOException("Could not create root directory for local recovery: " +
localStateRecoveryRootDir);
}
}
// register a shutdown hook
this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG);
}
示例8
public void shutdown() {
HashMap<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> toRelease;
synchronized (lock) {
if (closed) {
return;
}
closed = true;
toRelease = new HashMap<>(taskStateStoresByAllocationID);
taskStateStoresByAllocationID.clear();
}
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> entry :
toRelease.entrySet()) {
doRelease(entry.getValue().values());
cleanupAllocationBaseDirs(entry.getKey());
}
}
示例9
@Override
public void close() throws IOException {
cancelCleanupTask();
if (shutdownRequested.compareAndSet(false, true)) {
log.info("Shutting down BLOB cache");
// Clean up the storage directory
try {
FileUtils.deleteDirectory(storageDir);
} finally {
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), log);
}
}
}
示例10
public static void main(String[] args) throws Exception{
boolean callerHasHook = Boolean.parseBoolean(args[0]);
String tmpDirectory = args[1];
String signalFilePath = args[2];
FileChannelManager manager = new FileChannelManagerImpl(new String[]{tmpDirectory}, DIR_NAME_PREFIX);
if (callerHasHook) {
// Verifies the case that both FileChannelManager and its upper component
// have registered shutdown hooks, like in IOManager.
ShutdownHookUtil.addShutdownHook(() -> manager.close(), "Caller", LOG);
}
// Signals the main process to execute the kill action.
new File(signalFilePath).createNewFile();
// Blocks the process to wait to be killed.
Thread.sleep(3 * TEST_TIMEOUT.toMillis());
}
示例11
protected void dispose() {
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
try {
PlanExecutor executor = this.executor;
if (executor != null) {
executor.endSession(jobID);
executor.stop();
}
}
catch (Exception e) {
throw new RuntimeException("Failed to dispose the session shutdown hook.");
}
}
示例12
@Override
public void close() throws IOException {
cleanupFuture.cancel(false);
jobDetailsCache.invalidateAll();
// clean up the storage directory
FileUtils.deleteFileOrDirectory(storageDir);
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
}
示例13
public AbstractBlobCache(
final Configuration blobClientConfig,
final BlobView blobView,
final Logger logger,
@Nullable final InetSocketAddress serverAddress) throws IOException {
this.log = checkNotNull(logger);
this.blobClientConfig = checkNotNull(blobClientConfig);
this.blobView = checkNotNull(blobView);
this.readWriteLock = new ReentrantReadWriteLock();
// configure and create the storage directory
this.storageDir = BlobUtils.initLocalStorageDirectory(blobClientConfig);
log.info("Created BLOB cache storage directory " + storageDir);
// configure the number of fetch retries
final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
if (fetchRetries >= 0) {
this.numFetchRetries = fetchRetries;
} else {
log.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.",
BlobServerOptions.FETCH_RETRIES.key());
this.numFetchRetries = 0;
}
// Add shutdown hook to delete storage directory
shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), log);
this.serverAddress = serverAddress;
}
示例14
private static Thread createShutdownHook(final FileCache cache, final Logger logger) {
return ShutdownHookUtil.addShutdownHook(
cache::shutdown,
FileCache.class.getSimpleName(),
logger
);
}
示例15
/**
* Clean up of temporary directories created by the {@link ClusterEntrypoint}.
*
* @throws IOException if the temporary directories could not be cleaned up
*/
private void cleanupDirectories() throws IOException {
ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(), LOG);
final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
FileUtils.deleteDirectory(new File(webTmpDir));
}
示例16
protected void dispose() {
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
try {
PlanExecutor executor = this.executor;
if (executor != null) {
executor.stop();
}
}
catch (Exception e) {
throw new RuntimeException("Failed to dispose the session shutdown hook.");
}
}
示例17
@Override
public void close() throws IOException {
cleanupFuture.cancel(false);
jobDetailsCache.invalidateAll();
// clean up the storage directory
FileUtils.deleteFileOrDirectory(storageDir);
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
}
示例18
public AbstractBlobCache(
final Configuration blobClientConfig,
final BlobView blobView,
final Logger logger,
@Nullable final InetSocketAddress serverAddress) throws IOException {
this.log = checkNotNull(logger);
this.blobClientConfig = checkNotNull(blobClientConfig);
this.blobView = checkNotNull(blobView);
this.readWriteLock = new ReentrantReadWriteLock();
// configure and create the storage directory
this.storageDir = BlobUtils.initLocalStorageDirectory(blobClientConfig);
log.info("Created BLOB cache storage directory " + storageDir);
// configure the number of fetch retries
final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
if (fetchRetries >= 0) {
this.numFetchRetries = fetchRetries;
} else {
log.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.",
BlobServerOptions.FETCH_RETRIES.key());
this.numFetchRetries = 0;
}
// Add shutdown hook to delete storage directory
shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), log);
this.serverAddress = serverAddress;
}
示例19
private static Thread createShutdownHook(final FileCache cache, final Logger logger) {
return ShutdownHookUtil.addShutdownHook(
cache::shutdown,
FileCache.class.getSimpleName(),
logger
);
}
示例20
/**
* Clean up of temporary directories created by the {@link ClusterEntrypoint}.
*
* @throws IOException if the temporary directories could not be cleaned up
*/
private void cleanupDirectories() throws IOException {
ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(), LOG);
final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
FileUtils.deleteDirectory(new File(webTmpDir));
}
示例21
@Override
public void open() throws Exception {
baseDirectory = createBaseDirectory(tmpDirectories);
archivesDirectory = String.join(File.separator, baseDirectory, PYTHON_ARCHIVES_DIR);
requirementsDirectory = String.join(File.separator, baseDirectory, PYTHON_REQUIREMENTS_DIR);
filesDirectory = String.join(File.separator, baseDirectory, PYTHON_FILES_DIR);
File baseDirectoryFile = new File(baseDirectory);
if (!baseDirectoryFile.exists() && !baseDirectoryFile.mkdir()) {
throw new IOException(
"Could not create the base directory: " + baseDirectory);
}
shutdownHook = ShutdownHookUtil.addShutdownHook(
this, ProcessPythonEnvironmentManager.class.getSimpleName(), LOG);
}
示例22
@Override
public void close() throws Exception {
try {
int retries = 0;
while (true) {
try {
FileUtils.deleteDirectory(new File(baseDirectory));
break;
} catch (Throwable t) {
retries++;
if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
LOG.warn(
String.format(
"Failed to delete the working directory %s of the Python UDF worker. Retrying...",
baseDirectory),
t);
} else {
LOG.warn(
String.format(
"Failed to delete the working directory %s of the Python UDF worker.", baseDirectory),
t);
break;
}
}
}
} finally {
if (shutdownHook != null) {
ShutdownHookUtil.removeShutdownHook(
shutdownHook, ProcessPythonEnvironmentManager.class.getSimpleName(), LOG);
shutdownHook = null;
}
}
}
示例23
@Override
public void close() throws IOException {
cleanupFuture.cancel(false);
jobDetailsCache.invalidateAll();
// clean up the storage directory
FileUtils.deleteFileOrDirectory(storageDir);
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
}
示例24
public AbstractBlobCache(
final Configuration blobClientConfig,
final BlobView blobView,
final Logger logger,
@Nullable final InetSocketAddress serverAddress) throws IOException {
this.log = checkNotNull(logger);
this.blobClientConfig = checkNotNull(blobClientConfig);
this.blobView = checkNotNull(blobView);
this.readWriteLock = new ReentrantReadWriteLock();
// configure and create the storage directory
this.storageDir = BlobUtils.initLocalStorageDirectory(blobClientConfig);
log.info("Created BLOB cache storage directory " + storageDir);
// configure the number of fetch retries
final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
if (fetchRetries >= 0) {
this.numFetchRetries = fetchRetries;
} else {
log.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.",
BlobServerOptions.FETCH_RETRIES.key());
this.numFetchRetries = 0;
}
// Add shutdown hook to delete storage directory
shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), log);
this.serverAddress = serverAddress;
}
示例25
private static Thread createShutdownHook(final FileCache cache, final Logger logger) {
return ShutdownHookUtil.addShutdownHook(
cache::shutdown,
FileCache.class.getSimpleName(),
logger
);
}
示例26
/**
* Clean up of temporary directories created by the {@link ClusterEntrypoint}.
*
* @throws IOException if the temporary directories could not be cleaned up
*/
private void cleanupDirectories() throws IOException {
ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(), LOG);
final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
FileUtils.deleteDirectory(new File(webTmpDir));
}
示例27
public FileChannelManagerImpl(String[] tempDirs, String prefix) {
checkNotNull(tempDirs, "The temporary directories must not be null.");
checkArgument(tempDirs.length > 0, "The temporary directories must not be empty.");
this.random = new Random();
this.nextPath = 0;
this.prefix = prefix;
shutdownHook = ShutdownHookUtil.addShutdownHook(this, String.format("%s-%s", getClass().getSimpleName(), prefix), LOG);
// Creates directories after registering shutdown hook to ensure the directories can be
// removed if required.
this.paths = createFiles(tempDirs, prefix);
}
示例28
/**
* Remove all the temp directories.
*/
@Override
public void close() throws Exception {
// Marks shutdown and exits if it has already shutdown.
if (!isShutdown.compareAndSet(false, true)) {
return;
}
IOUtils.closeAll(Arrays.stream(paths)
.filter(File::exists)
.map(FileChannelManagerImpl::getFileCloser)
.collect(Collectors.toList()));
ShutdownHookUtil.removeShutdownHook(shutdownHook, String.format("%s-%s", getClass().getSimpleName(), prefix), LOG);
}
示例29
private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
checkNotNull(jobClient);
JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
jobClient.getJobExecutionResult(getUserCodeClassLoader());
if (getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
Thread shutdownHook = ShutdownHookUtil.addShutdownHook(
() -> {
// wait a smidgen to allow the async request to go through before
// the jvm exits
jobClient.cancel().get(1, TimeUnit.SECONDS);
},
ContextEnvironment.class.getSimpleName(),
LOG);
jobExecutionResultFuture.whenComplete((ignored, throwable) ->
ShutdownHookUtil.removeShutdownHook(
shutdownHook, ContextEnvironment.class.getSimpleName(), LOG));
}
jobExecutionResult = jobExecutionResultFuture.get();
System.out.println(jobExecutionResult);
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
return jobExecutionResult;
}
示例30
private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
checkNotNull(jobClient);
JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
jobClient.getJobExecutionResult(getUserClassloader());
if (getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
Thread shutdownHook = ShutdownHookUtil.addShutdownHook(
() -> {
// wait a smidgen to allow the async request to go through before
// the jvm exits
jobClient.cancel().get(1, TimeUnit.SECONDS);
},
StreamContextEnvironment.class.getSimpleName(),
LOG);
jobExecutionResultFuture.whenComplete((ignored, throwable) ->
ShutdownHookUtil.removeShutdownHook(
shutdownHook, StreamContextEnvironment.class.getSimpleName(), LOG));
}
jobExecutionResult = jobExecutionResultFuture.get();
System.out.println(jobExecutionResult);
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
return jobExecutionResult;
}