Java源码示例:org.apache.flink.runtime.clusterframework.ApplicationStatus

示例1
@GuardedBy("lock")
private void setupDispatcherResourceManagerComponents(Configuration configuration, RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory, MetricQueryServiceRetriever metricQueryServiceRetriever) throws Exception {
	dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
		configuration,
		dispatcherResourceManagreComponentRpcServiceFactory,
		haServices,
		blobServer,
		heartbeatServices,
		metricRegistry,
		metricQueryServiceRetriever,
		new ShutDownFatalErrorHandler()
	));

	final Collection<CompletableFuture<ApplicationStatus>> shutDownFutures = new ArrayList<>(dispatcherResourceManagerComponents.size());

	for (DispatcherResourceManagerComponent dispatcherResourceManagerComponent : dispatcherResourceManagerComponents) {
		final CompletableFuture<ApplicationStatus> shutDownFuture = dispatcherResourceManagerComponent.getShutDownFuture();
		FutureUtils.assertNoException(shutDownFuture.thenRun(dispatcherResourceManagerComponent::closeAsync));
		shutDownFutures.add(shutDownFuture);
	}

	FutureUtils.assertNoException(FutureUtils.completeAll(shutDownFutures).thenRun(this::closeAsync));
}
 
示例2
@Test
public void testDispatcherIsCancelledWhenOneJobIsCancelled() throws Exception {
	final CompletableFuture<ApplicationStatus> clusterShutdownStatus = new CompletableFuture<>();

	final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
			.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
			.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.CANCELED))
			.setClusterShutdownFunction((status) -> {
				clusterShutdownStatus.complete(status);
				return CompletableFuture.completedFuture(Acknowledge.get());
			})
			.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createCancelledJobResult(jobId)));

	ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);

	final CompletableFuture<Acknowledge> shutdownFuture =
			bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);

	// wait until the bootstrap "thinks" it's done, also makes sure that we don't
	// fail the future exceptionally with a JobCancelledException
	shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);

	assertThat(clusterShutdownStatus.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(ApplicationStatus.CANCELED));
}
 
示例3
/**
 * Converts a Flink application status enum to a YARN application status enum.
 * @param status The Flink application status.
 * @return The corresponding YARN application status.
 */
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
	if (status == null) {
		return FinalApplicationStatus.UNDEFINED;
	}
	else {
		switch (status) {
			case SUCCEEDED:
				return FinalApplicationStatus.SUCCEEDED;
			case FAILED:
				return FinalApplicationStatus.FAILED;
			case CANCELED:
				return FinalApplicationStatus.KILLED;
			default:
				return FinalApplicationStatus.UNDEFINED;
		}
	}
}
 
示例4
private void updateApplicationStatus() {
	if (yarnClient.isInState(Service.STATE.STARTED)) {
		final ApplicationReport applicationReport;

		try {
			applicationReport = yarnClient.getApplicationReport(yarnApplicationId);
		} catch (Exception e) {
			LOG.info("Could not retrieve the Yarn application report for {}.", yarnApplicationId);
			applicationStatus = ApplicationStatus.UNKNOWN;
			return;
		}

		YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();

		if (yarnApplicationState == YarnApplicationState.FAILED || yarnApplicationState == YarnApplicationState.KILLED) {
			applicationStatus = ApplicationStatus.FAILED;
		} else {
			applicationStatus = ApplicationStatus.SUCCEEDED;
		}
	} else {
		LOG.info("Yarn client is no longer in state STARTED. Stopping the Yarn application status monitor.");
		applicationStatusUpdateFuture.cancel(false);
	}
}
 
示例5
private void updateApplicationStatus() {
	if (yarnClient.isInState(Service.STATE.STARTED)) {
		final ApplicationReport applicationReport;

		try {
			applicationReport = yarnClient.getApplicationReport(yarnApplicationId);
		} catch (Exception e) {
			LOG.info("Could not retrieve the Yarn application report for {}.", yarnApplicationId);
			applicationStatus = ApplicationStatus.UNKNOWN;
			return;
		}

		YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();

		if (yarnApplicationState == YarnApplicationState.FAILED || yarnApplicationState == YarnApplicationState.KILLED) {
			applicationStatus = ApplicationStatus.FAILED;
		} else {
			applicationStatus = ApplicationStatus.SUCCEEDED;
		}
	} else {
		LOG.info("Yarn client is no longer in state STARTED. Stopping the Yarn application status monitor.");
		applicationStatusUpdateFuture.cancel(false);
	}
}
 
示例6
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
	final CompletableFuture<JobResult> jobResultFuture = super.requestJobResult(jobId, timeout);

	if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
		// terminate the MiniDispatcher once we served the first JobResult successfully
		jobResultFuture.thenAccept((JobResult result) -> {
			ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
					ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;

			LOG.debug("Shutting down per-job cluster because someone retrieved the job result.");
			shutDownFuture.complete(status);
		});
	} else {
		LOG.debug("Not shutting down per-job cluster after someone retrieved the job result.");
	}

	return jobResultFuture;
}
 
示例7
JobDispatcherResourceManagerComponent(
		MiniDispatcher dispatcher,
		ResourceManager<?> resourceManager,
		LeaderRetrievalService dispatcherLeaderRetrievalService,
		LeaderRetrievalService resourceManagerRetrievalService,
		WebMonitorEndpoint<?> webMonitorEndpoint,
		JobManagerMetricGroup jobManagerMetricGroup) {
	super(dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup);

	final CompletableFuture<ApplicationStatus> shutDownFuture = getShutDownFuture();

	dispatcher.getJobTerminationFuture().whenComplete((applicationStatus, throwable) -> {
		if (throwable != null) {
			shutDownFuture.completeExceptionally(throwable);
		} else {
			shutDownFuture.complete(applicationStatus);
		}
	});
}
 
示例8
private void registerShutDownFuture() {
	terminationFuture.whenComplete(
		(aVoid, throwable) -> {
			if (throwable != null) {
				shutDownFuture.completeExceptionally(throwable);
			} else {
				shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
			}
		});

	dispatcher
		.getTerminationFuture()
		.whenComplete(
			(aVoid, throwable) -> {
				if (throwable != null) {
					shutDownFuture.completeExceptionally(throwable);
				} else {
					shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
				}
			});
}
 
示例9
/**
 * Converts a Flink application status enum to a YARN application status enum.
 * @param status The Flink application status.
 * @return The corresponding YARN application status.
 */
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
	if (status == null) {
		return FinalApplicationStatus.UNDEFINED;
	}
	else {
		switch (status) {
			case SUCCEEDED:
				return FinalApplicationStatus.SUCCEEDED;
			case FAILED:
				return FinalApplicationStatus.FAILED;
			case CANCELED:
				return FinalApplicationStatus.KILLED;
			default:
				return FinalApplicationStatus.UNDEFINED;
		}
	}
}
 
示例10
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
	final CompletableFuture<JobResult> jobResultFuture = super.requestJobResult(jobId, timeout);

	if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
		// terminate the MiniDispatcher once we served the first JobResult successfully
		jobResultFuture.thenAccept((JobResult result) -> {
			ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
					ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;

			jobTerminationFuture.complete(status);
		});
	}

	return jobResultFuture;
}
 
示例11
private void registerShutDownFuture() {
	terminationFuture.whenComplete(
		(aVoid, throwable) -> {
			if (throwable != null) {
				shutDownFuture.completeExceptionally(throwable);
			} else {
				shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
			}
		});

	dispatcher
		.getTerminationFuture()
		.whenComplete(
			(aVoid, throwable) -> {
				if (throwable != null) {
					shutDownFuture.completeExceptionally(throwable);
				} else {
					shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
				}
			});
}
 
示例12
@Test
public void testClusterShutdownWhenApplicationFails() throws Exception {
	// we're "listening" on this to be completed to verify that the cluster
	// is being shut down from the ApplicationDispatcherBootstrap
	final CompletableFuture<ApplicationStatus> externalShutdownFuture = new CompletableFuture<>();

	final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
			.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
			.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FAILED))
			.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createFailedJobResult(jobId)))
			.setClusterShutdownFunction((status) -> {
				externalShutdownFuture.complete(status);
				return CompletableFuture.completedFuture(Acknowledge.get());
			});

	ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);

	final CompletableFuture<Acknowledge> shutdownFuture =
			bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);

	// wait until the bootstrap "thinks" it's done
	shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);

	// verify that the dispatcher is actually being shut down
	assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(ApplicationStatus.FAILED));
}
 
示例13
public YarnApplicationStatusMonitor(
		YarnClient yarnClient,
		ApplicationId yarnApplicationId,
		ScheduledExecutor scheduledExecutor) {
	this.yarnClient = Preconditions.checkNotNull(yarnClient);
	this.yarnApplicationId = Preconditions.checkNotNull(yarnApplicationId);

	applicationStatusUpdateFuture = scheduledExecutor.scheduleWithFixedDelay(
		this::updateApplicationStatus,
		0L,
		UPDATE_INTERVAL,
		TimeUnit.MILLISECONDS);

	applicationStatus = ApplicationStatus.UNKNOWN;
}
 
示例14
public void startCluster() throws ClusterEntrypointException {
	LOG.info("Starting {}.", getClass().getSimpleName());

	try {
		PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
		configureFileSystems(configuration, pluginManager);

		SecurityContext securityContext = installSecurityContext(configuration);

		securityContext.runSecured((Callable<Void>) () -> {
			runCluster(configuration, pluginManager);

			return null;
		});
	} catch (Throwable t) {
		final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);

		try {
			// clean up any partial state
			shutDownAsync(
				ApplicationStatus.FAILED,
				ExceptionUtils.stringifyException(strippedThrowable),
				false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
		} catch (InterruptedException | ExecutionException | TimeoutException e) {
			strippedThrowable.addSuppressed(e);
		}

		throw new ClusterEntrypointException(
			String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
			strippedThrowable);
	}
}
 
示例15
@Override
protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
	super.jobReachedGloballyTerminalState(archivedExecutionGraph);

	if (executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
		// shut down since we don't have to wait for the execution result retrieval
		jobTerminationFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
	}
}
 
示例16
private JobResult(
		final JobID jobId,
		final ApplicationStatus applicationStatus,
		final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults,
		final long netRuntime,
		@Nullable final SerializedThrowable serializedThrowable) {

	checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0");

	this.jobId = requireNonNull(jobId);
	this.applicationStatus = requireNonNull(applicationStatus);
	this.accumulatorResults = requireNonNull(accumulatorResults);
	this.netRuntime = netRuntime;
	this.serializedThrowable = serializedThrowable;
}
 
示例17
/**
 * Creates the {@link JobResult} from the given {@link AccessExecutionGraph} which
 * must be in a globally terminal state.
 *
 * @param accessExecutionGraph to create the JobResult from
 * @return JobResult of the given AccessExecutionGraph
 */
public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
	final JobID jobId = accessExecutionGraph.getJobID();
	final JobStatus jobStatus = accessExecutionGraph.getState();

	checkArgument(
		jobStatus.isGloballyTerminalState(),
		"The job " + accessExecutionGraph.getJobName() + '(' + jobId + ") is not in a globally " +
			"terminal state. It is in state " + jobStatus + '.');

	final JobResult.Builder builder = new JobResult.Builder();
	builder.jobId(jobId);

	builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));

	final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
	// guard against clock changes
	final long guardedNetRuntime = Math.max(netRuntime, 0L);
	builder.netRuntime(guardedNetRuntime);
	builder.accumulatorResults(accessExecutionGraph.getAccumulatorsSerialized());

	if (jobStatus != JobStatus.FINISHED) {
		final ErrorInfo errorInfo = accessExecutionGraph.getFailureInfo();

		if (errorInfo != null) {
			builder.serializedThrowable(errorInfo.getException());
		}
	}

	return builder.build();
}
 
示例18
/**
 * Cleanup application and shut down cluster.
 *
 * @param finalStatus of the Flink application
 * @param diagnostics diagnostics message for the Flink application or {@code null}
 */
@Override
public CompletableFuture<Acknowledge> deregisterApplication(
		final ApplicationStatus finalStatus,
		@Nullable final String diagnostics) {
	log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, diagnostics);

	try {
		internalDeregisterApplication(finalStatus, diagnostics);
	} catch (ResourceManagerException e) {
		log.warn("Could not properly shutdown the application.", e);
	}

	return CompletableFuture.completedFuture(Acknowledge.get());
}
 
示例19
private static JobResult createCancelledJobResult(final JobID jobId) {
	return new JobResult.Builder()
			.jobId(jobId)
			.netRuntime(2L)
			.serializedThrowable(
					new SerializedThrowable(
							new JobCancellationException(jobId, "Hello", null)))
			.applicationStatus(ApplicationStatus.CANCELED)
			.build();
}
 
示例20
@Test
public void getShutDownFuture_afterClose_ignoresDispatcherLeaderProcessShutDownRequest() throws Exception {
	final UUID leaderSessionId = UUID.randomUUID();
	final CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture<>();
	final TestingDispatcherLeaderProcess testingDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(leaderSessionId)
		.setShutDownFuture(shutDownFuture)
		.build();
	testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess);

	try (final DispatcherRunner dispatcherRunner = createDispatcherRunner()) {
		testingLeaderElectionService.isLeader(leaderSessionId);

		final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture = dispatcherRunner.getShutDownFuture();

		assertFalse(dispatcherShutDownFuture.isDone());

		dispatcherRunner.closeAsync();

		final ApplicationStatus finalApplicationStatus = ApplicationStatus.UNKNOWN;
		shutDownFuture.complete(finalApplicationStatus);

		try {
			dispatcherShutDownFuture.get(10L, TimeUnit.MILLISECONDS);
			fail("The dispatcher runner should no longer react to the dispatcher leader process's shut down request if it has been terminated.");
		} catch (TimeoutException expected) {}
	}
}
 
示例21
/**
 * Deregister the Flink application from the resource management system by signalling
 * the {@link ResourceManager}.
 *
 * @param applicationStatus to terminate the application with
 * @param diagnostics additional information about the shut down, can be {@code null}
 * @return Future which is completed once the shut down
 */
private CompletableFuture<Void> closeClusterComponent(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
	synchronized (lock) {
		if (clusterComponent != null) {
			return clusterComponent.deregisterApplicationAndClose(applicationStatus, diagnostics);
		} else {
			return CompletableFuture.completedFuture(null);
		}
	}
}
 
示例22
/**
 * Deregister the Flink application from the resource management system by signalling
 * the {@link ResourceManager}.
 *
 * @param applicationStatus to terminate the application with
 * @param diagnostics additional information about the shut down, can be {@code null}
 * @return Future which is completed once the shut down
 */
public CompletableFuture<Void> deregisterApplicationAndClose(
		final ApplicationStatus applicationStatus,
		final @Nullable String diagnostics) {

	if (isRunning.compareAndSet(true, false)) {
		final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
			FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () -> deregisterApplication(applicationStatus, diagnostics));

		return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal);
	} else {
		return terminationFuture;
	}
}
 
示例23
/**
 * Deregister the Flink application from the resource management system by signalling
 * the {@link ResourceManager}.
 *
 * @param applicationStatus to terminate the application with
 * @param diagnostics additional information about the shut down, can be {@code null}
 * @return Future which is completed once the shut down
 */
public CompletableFuture<Void> deregisterApplicationAndClose(
		final ApplicationStatus applicationStatus,
		final @Nullable String diagnostics) {

	if (isRunning.compareAndSet(true, false)) {
		final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
			FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () -> deregisterApplication(applicationStatus, diagnostics));

		return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal);
	} else {
		return terminationFuture;
	}
}
 
示例24
@Test
public void testJobSubmitCancelStop() throws Exception {
	TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
	TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler();
	TestJobExecutionResultHandler testJobExecutionResultHandler =
		new TestJobExecutionResultHandler(
			JobExecutionResultResponseBody.created(new JobResult.Builder()
				.applicationStatus(ApplicationStatus.SUCCEEDED)
				.jobId(jobId)
				.netRuntime(Long.MAX_VALUE)
				.build()));

	try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
		submitHandler,
		terminationHandler,
		testJobExecutionResultHandler)) {
		RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());

		try {
			Assert.assertFalse(submitHandler.jobSubmitted);
			restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
			Assert.assertTrue(submitHandler.jobSubmitted);

			Assert.assertFalse(terminationHandler.jobCanceled);
			restClusterClient.cancel(jobId);
			Assert.assertTrue(terminationHandler.jobCanceled);

			Assert.assertFalse(terminationHandler.jobStopped);
			restClusterClient.stop(jobId);
			Assert.assertTrue(terminationHandler.jobStopped);
		} finally {
			restClusterClient.shutdown();
		}
	}
}
 
示例25
/**
 * Tests that application files are deleted when the YARN application master is de-registered.
 */
@Test
public void testDeleteApplicationFiles() throws Exception {
	new Context() {{
		final File applicationDir = folder.newFolder(".flink");
		env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath());

		runTest(() -> {
			resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
			assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
		});
	}};
}
 
示例26
/**
 * Tests that application files are deleted when the YARN application master is de-registered.
 */
@Test
public void testDeleteApplicationFiles() throws Exception {
	new Context() {{
		final File applicationDir = folder.newFolder(".flink");
		env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath());

		runTest(() -> {
			resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
			assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
		});
	}};
}
 
示例27
private CompletableFuture<Void> deregisterApplication(
		final ApplicationStatus applicationStatus,
		final @Nullable String diagnostics) {

	final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
	return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
}
 
示例28
@Override
protected void jobNotFinished(JobID jobId) {
	super.jobNotFinished(jobId);

	// shut down since we have done our job
	jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
 
示例29
/**
 * Creates the {@link JobResult} from the given {@link AccessExecutionGraph} which
 * must be in a globally terminal state.
 *
 * @param accessExecutionGraph to create the JobResult from
 * @return JobResult of the given AccessExecutionGraph
 */
public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
	final JobID jobId = accessExecutionGraph.getJobID();
	final JobStatus jobStatus = accessExecutionGraph.getState();

	checkArgument(
		jobStatus.isGloballyTerminalState(),
		"The job " + accessExecutionGraph.getJobName() + '(' + jobId + ") is not in a globally " +
			"terminal state. It is in state " + jobStatus + '.');

	final JobResult.Builder builder = new JobResult.Builder();
	builder.jobId(jobId);

	builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));

	final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
	// guard against clock changes
	final long guardedNetRuntime = Math.max(netRuntime, 0L);
	builder.netRuntime(guardedNetRuntime);
	builder.accumulatorResults(accessExecutionGraph.getAccumulatorsSerialized());

	if (jobStatus != JobStatus.FINISHED) {
		final ErrorInfo errorInfo = accessExecutionGraph.getFailureInfo();

		if (errorInfo != null) {
			builder.serializedThrowable(errorInfo.getException());
		}
	}

	return builder.build();
}
 
示例30
/**
 * Cleanup application and shut down cluster.
 *
 * @param finalStatus of the Flink application
 * @param diagnostics diagnostics message for the Flink application or {@code null}
 */
@Override
public CompletableFuture<Acknowledge> deregisterApplication(
		final ApplicationStatus finalStatus,
		@Nullable final String diagnostics) {
	log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, diagnostics);

	try {
		internalDeregisterApplication(finalStatus, diagnostics);
	} catch (ResourceManagerException e) {
		log.warn("Could not properly shutdown the application.", e);
	}

	return CompletableFuture.completedFuture(Acknowledge.get());
}