Java源码示例:org.apache.flink.runtime.clusterframework.ApplicationStatus
示例1
@GuardedBy("lock")
private void setupDispatcherResourceManagerComponents(Configuration configuration, RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory, MetricQueryServiceRetriever metricQueryServiceRetriever) throws Exception {
dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
configuration,
dispatcherResourceManagreComponentRpcServiceFactory,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
metricQueryServiceRetriever,
new ShutDownFatalErrorHandler()
));
final Collection<CompletableFuture<ApplicationStatus>> shutDownFutures = new ArrayList<>(dispatcherResourceManagerComponents.size());
for (DispatcherResourceManagerComponent dispatcherResourceManagerComponent : dispatcherResourceManagerComponents) {
final CompletableFuture<ApplicationStatus> shutDownFuture = dispatcherResourceManagerComponent.getShutDownFuture();
FutureUtils.assertNoException(shutDownFuture.thenRun(dispatcherResourceManagerComponent::closeAsync));
shutDownFutures.add(shutDownFuture);
}
FutureUtils.assertNoException(FutureUtils.completeAll(shutDownFutures).thenRun(this::closeAsync));
}
示例2
@Test
public void testDispatcherIsCancelledWhenOneJobIsCancelled() throws Exception {
final CompletableFuture<ApplicationStatus> clusterShutdownStatus = new CompletableFuture<>();
final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.CANCELED))
.setClusterShutdownFunction((status) -> {
clusterShutdownStatus.complete(status);
return CompletableFuture.completedFuture(Acknowledge.get());
})
.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createCancelledJobResult(jobId)));
ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);
final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);
// wait until the bootstrap "thinks" it's done, also makes sure that we don't
// fail the future exceptionally with a JobCancelledException
shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(clusterShutdownStatus.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(ApplicationStatus.CANCELED));
}
示例3
/**
* Converts a Flink application status enum to a YARN application status enum.
* @param status The Flink application status.
* @return The corresponding YARN application status.
*/
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
if (status == null) {
return FinalApplicationStatus.UNDEFINED;
}
else {
switch (status) {
case SUCCEEDED:
return FinalApplicationStatus.SUCCEEDED;
case FAILED:
return FinalApplicationStatus.FAILED;
case CANCELED:
return FinalApplicationStatus.KILLED;
default:
return FinalApplicationStatus.UNDEFINED;
}
}
}
示例4
private void updateApplicationStatus() {
if (yarnClient.isInState(Service.STATE.STARTED)) {
final ApplicationReport applicationReport;
try {
applicationReport = yarnClient.getApplicationReport(yarnApplicationId);
} catch (Exception e) {
LOG.info("Could not retrieve the Yarn application report for {}.", yarnApplicationId);
applicationStatus = ApplicationStatus.UNKNOWN;
return;
}
YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
if (yarnApplicationState == YarnApplicationState.FAILED || yarnApplicationState == YarnApplicationState.KILLED) {
applicationStatus = ApplicationStatus.FAILED;
} else {
applicationStatus = ApplicationStatus.SUCCEEDED;
}
} else {
LOG.info("Yarn client is no longer in state STARTED. Stopping the Yarn application status monitor.");
applicationStatusUpdateFuture.cancel(false);
}
}
示例5
private void updateApplicationStatus() {
if (yarnClient.isInState(Service.STATE.STARTED)) {
final ApplicationReport applicationReport;
try {
applicationReport = yarnClient.getApplicationReport(yarnApplicationId);
} catch (Exception e) {
LOG.info("Could not retrieve the Yarn application report for {}.", yarnApplicationId);
applicationStatus = ApplicationStatus.UNKNOWN;
return;
}
YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
if (yarnApplicationState == YarnApplicationState.FAILED || yarnApplicationState == YarnApplicationState.KILLED) {
applicationStatus = ApplicationStatus.FAILED;
} else {
applicationStatus = ApplicationStatus.SUCCEEDED;
}
} else {
LOG.info("Yarn client is no longer in state STARTED. Stopping the Yarn application status monitor.");
applicationStatusUpdateFuture.cancel(false);
}
}
示例6
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
final CompletableFuture<JobResult> jobResultFuture = super.requestJobResult(jobId, timeout);
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the first JobResult successfully
jobResultFuture.thenAccept((JobResult result) -> {
ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
LOG.debug("Shutting down per-job cluster because someone retrieved the job result.");
shutDownFuture.complete(status);
});
} else {
LOG.debug("Not shutting down per-job cluster after someone retrieved the job result.");
}
return jobResultFuture;
}
示例7
JobDispatcherResourceManagerComponent(
MiniDispatcher dispatcher,
ResourceManager<?> resourceManager,
LeaderRetrievalService dispatcherLeaderRetrievalService,
LeaderRetrievalService resourceManagerRetrievalService,
WebMonitorEndpoint<?> webMonitorEndpoint,
JobManagerMetricGroup jobManagerMetricGroup) {
super(dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup);
final CompletableFuture<ApplicationStatus> shutDownFuture = getShutDownFuture();
dispatcher.getJobTerminationFuture().whenComplete((applicationStatus, throwable) -> {
if (throwable != null) {
shutDownFuture.completeExceptionally(throwable);
} else {
shutDownFuture.complete(applicationStatus);
}
});
}
示例8
private void registerShutDownFuture() {
terminationFuture.whenComplete(
(aVoid, throwable) -> {
if (throwable != null) {
shutDownFuture.completeExceptionally(throwable);
} else {
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
}
});
dispatcher
.getTerminationFuture()
.whenComplete(
(aVoid, throwable) -> {
if (throwable != null) {
shutDownFuture.completeExceptionally(throwable);
} else {
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
}
});
}
示例9
/**
* Converts a Flink application status enum to a YARN application status enum.
* @param status The Flink application status.
* @return The corresponding YARN application status.
*/
private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
if (status == null) {
return FinalApplicationStatus.UNDEFINED;
}
else {
switch (status) {
case SUCCEEDED:
return FinalApplicationStatus.SUCCEEDED;
case FAILED:
return FinalApplicationStatus.FAILED;
case CANCELED:
return FinalApplicationStatus.KILLED;
default:
return FinalApplicationStatus.UNDEFINED;
}
}
}
示例10
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
final CompletableFuture<JobResult> jobResultFuture = super.requestJobResult(jobId, timeout);
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the first JobResult successfully
jobResultFuture.thenAccept((JobResult result) -> {
ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
jobTerminationFuture.complete(status);
});
}
return jobResultFuture;
}
示例11
private void registerShutDownFuture() {
terminationFuture.whenComplete(
(aVoid, throwable) -> {
if (throwable != null) {
shutDownFuture.completeExceptionally(throwable);
} else {
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
}
});
dispatcher
.getTerminationFuture()
.whenComplete(
(aVoid, throwable) -> {
if (throwable != null) {
shutDownFuture.completeExceptionally(throwable);
} else {
shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
}
});
}
示例12
@Test
public void testClusterShutdownWhenApplicationFails() throws Exception {
// we're "listening" on this to be completed to verify that the cluster
// is being shut down from the ApplicationDispatcherBootstrap
final CompletableFuture<ApplicationStatus> externalShutdownFuture = new CompletableFuture<>();
final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FAILED))
.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createFailedJobResult(jobId)))
.setClusterShutdownFunction((status) -> {
externalShutdownFuture.complete(status);
return CompletableFuture.completedFuture(Acknowledge.get());
});
ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);
final CompletableFuture<Acknowledge> shutdownFuture =
bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);
// wait until the bootstrap "thinks" it's done
shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
// verify that the dispatcher is actually being shut down
assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(ApplicationStatus.FAILED));
}
示例13
public YarnApplicationStatusMonitor(
YarnClient yarnClient,
ApplicationId yarnApplicationId,
ScheduledExecutor scheduledExecutor) {
this.yarnClient = Preconditions.checkNotNull(yarnClient);
this.yarnApplicationId = Preconditions.checkNotNull(yarnApplicationId);
applicationStatusUpdateFuture = scheduledExecutor.scheduleWithFixedDelay(
this::updateApplicationStatus,
0L,
UPDATE_INTERVAL,
TimeUnit.MILLISECONDS);
applicationStatus = ApplicationStatus.UNKNOWN;
}
示例14
public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
configureFileSystems(configuration, pluginManager);
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () -> {
runCluster(configuration, pluginManager);
return null;
});
} catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
try {
// clean up any partial state
shutDownAsync(
ApplicationStatus.FAILED,
ExceptionUtils.stringifyException(strippedThrowable),
false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
strippedThrowable.addSuppressed(e);
}
throw new ClusterEntrypointException(
String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
strippedThrowable);
}
}
示例15
@Override
protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
super.jobReachedGloballyTerminalState(archivedExecutionGraph);
if (executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
// shut down since we don't have to wait for the execution result retrieval
jobTerminationFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
}
}
示例16
private JobResult(
final JobID jobId,
final ApplicationStatus applicationStatus,
final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults,
final long netRuntime,
@Nullable final SerializedThrowable serializedThrowable) {
checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0");
this.jobId = requireNonNull(jobId);
this.applicationStatus = requireNonNull(applicationStatus);
this.accumulatorResults = requireNonNull(accumulatorResults);
this.netRuntime = netRuntime;
this.serializedThrowable = serializedThrowable;
}
示例17
/**
* Creates the {@link JobResult} from the given {@link AccessExecutionGraph} which
* must be in a globally terminal state.
*
* @param accessExecutionGraph to create the JobResult from
* @return JobResult of the given AccessExecutionGraph
*/
public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
final JobID jobId = accessExecutionGraph.getJobID();
final JobStatus jobStatus = accessExecutionGraph.getState();
checkArgument(
jobStatus.isGloballyTerminalState(),
"The job " + accessExecutionGraph.getJobName() + '(' + jobId + ") is not in a globally " +
"terminal state. It is in state " + jobStatus + '.');
final JobResult.Builder builder = new JobResult.Builder();
builder.jobId(jobId);
builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));
final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
// guard against clock changes
final long guardedNetRuntime = Math.max(netRuntime, 0L);
builder.netRuntime(guardedNetRuntime);
builder.accumulatorResults(accessExecutionGraph.getAccumulatorsSerialized());
if (jobStatus != JobStatus.FINISHED) {
final ErrorInfo errorInfo = accessExecutionGraph.getFailureInfo();
if (errorInfo != null) {
builder.serializedThrowable(errorInfo.getException());
}
}
return builder.build();
}
示例18
/**
* Cleanup application and shut down cluster.
*
* @param finalStatus of the Flink application
* @param diagnostics diagnostics message for the Flink application or {@code null}
*/
@Override
public CompletableFuture<Acknowledge> deregisterApplication(
final ApplicationStatus finalStatus,
@Nullable final String diagnostics) {
log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, diagnostics);
try {
internalDeregisterApplication(finalStatus, diagnostics);
} catch (ResourceManagerException e) {
log.warn("Could not properly shutdown the application.", e);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
示例19
private static JobResult createCancelledJobResult(final JobID jobId) {
return new JobResult.Builder()
.jobId(jobId)
.netRuntime(2L)
.serializedThrowable(
new SerializedThrowable(
new JobCancellationException(jobId, "Hello", null)))
.applicationStatus(ApplicationStatus.CANCELED)
.build();
}
示例20
@Test
public void getShutDownFuture_afterClose_ignoresDispatcherLeaderProcessShutDownRequest() throws Exception {
final UUID leaderSessionId = UUID.randomUUID();
final CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture<>();
final TestingDispatcherLeaderProcess testingDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(leaderSessionId)
.setShutDownFuture(shutDownFuture)
.build();
testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess);
try (final DispatcherRunner dispatcherRunner = createDispatcherRunner()) {
testingLeaderElectionService.isLeader(leaderSessionId);
final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture = dispatcherRunner.getShutDownFuture();
assertFalse(dispatcherShutDownFuture.isDone());
dispatcherRunner.closeAsync();
final ApplicationStatus finalApplicationStatus = ApplicationStatus.UNKNOWN;
shutDownFuture.complete(finalApplicationStatus);
try {
dispatcherShutDownFuture.get(10L, TimeUnit.MILLISECONDS);
fail("The dispatcher runner should no longer react to the dispatcher leader process's shut down request if it has been terminated.");
} catch (TimeoutException expected) {}
}
}
示例21
/**
* Deregister the Flink application from the resource management system by signalling
* the {@link ResourceManager}.
*
* @param applicationStatus to terminate the application with
* @param diagnostics additional information about the shut down, can be {@code null}
* @return Future which is completed once the shut down
*/
private CompletableFuture<Void> closeClusterComponent(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
synchronized (lock) {
if (clusterComponent != null) {
return clusterComponent.deregisterApplicationAndClose(applicationStatus, diagnostics);
} else {
return CompletableFuture.completedFuture(null);
}
}
}
示例22
/**
* Deregister the Flink application from the resource management system by signalling
* the {@link ResourceManager}.
*
* @param applicationStatus to terminate the application with
* @param diagnostics additional information about the shut down, can be {@code null}
* @return Future which is completed once the shut down
*/
public CompletableFuture<Void> deregisterApplicationAndClose(
final ApplicationStatus applicationStatus,
final @Nullable String diagnostics) {
if (isRunning.compareAndSet(true, false)) {
final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () -> deregisterApplication(applicationStatus, diagnostics));
return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal);
} else {
return terminationFuture;
}
}
示例23
/**
* Deregister the Flink application from the resource management system by signalling
* the {@link ResourceManager}.
*
* @param applicationStatus to terminate the application with
* @param diagnostics additional information about the shut down, can be {@code null}
* @return Future which is completed once the shut down
*/
public CompletableFuture<Void> deregisterApplicationAndClose(
final ApplicationStatus applicationStatus,
final @Nullable String diagnostics) {
if (isRunning.compareAndSet(true, false)) {
final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () -> deregisterApplication(applicationStatus, diagnostics));
return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal);
} else {
return terminationFuture;
}
}
示例24
@Test
public void testJobSubmitCancelStop() throws Exception {
TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler();
TestJobExecutionResultHandler testJobExecutionResultHandler =
new TestJobExecutionResultHandler(
JobExecutionResultResponseBody.created(new JobResult.Builder()
.applicationStatus(ApplicationStatus.SUCCEEDED)
.jobId(jobId)
.netRuntime(Long.MAX_VALUE)
.build()));
try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(
submitHandler,
terminationHandler,
testJobExecutionResultHandler)) {
RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
try {
Assert.assertFalse(submitHandler.jobSubmitted);
restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
Assert.assertTrue(submitHandler.jobSubmitted);
Assert.assertFalse(terminationHandler.jobCanceled);
restClusterClient.cancel(jobId);
Assert.assertTrue(terminationHandler.jobCanceled);
Assert.assertFalse(terminationHandler.jobStopped);
restClusterClient.stop(jobId);
Assert.assertTrue(terminationHandler.jobStopped);
} finally {
restClusterClient.shutdown();
}
}
}
示例25
/**
* Tests that application files are deleted when the YARN application master is de-registered.
*/
@Test
public void testDeleteApplicationFiles() throws Exception {
new Context() {{
final File applicationDir = folder.newFolder(".flink");
env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath());
runTest(() -> {
resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
});
}};
}
示例26
/**
* Tests that application files are deleted when the YARN application master is de-registered.
*/
@Test
public void testDeleteApplicationFiles() throws Exception {
new Context() {{
final File applicationDir = folder.newFolder(".flink");
env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath());
runTest(() -> {
resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
});
}};
}
示例27
private CompletableFuture<Void> deregisterApplication(
final ApplicationStatus applicationStatus,
final @Nullable String diagnostics) {
final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
}
示例28
@Override
protected void jobNotFinished(JobID jobId) {
super.jobNotFinished(jobId);
// shut down since we have done our job
jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
示例29
/**
* Creates the {@link JobResult} from the given {@link AccessExecutionGraph} which
* must be in a globally terminal state.
*
* @param accessExecutionGraph to create the JobResult from
* @return JobResult of the given AccessExecutionGraph
*/
public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
final JobID jobId = accessExecutionGraph.getJobID();
final JobStatus jobStatus = accessExecutionGraph.getState();
checkArgument(
jobStatus.isGloballyTerminalState(),
"The job " + accessExecutionGraph.getJobName() + '(' + jobId + ") is not in a globally " +
"terminal state. It is in state " + jobStatus + '.');
final JobResult.Builder builder = new JobResult.Builder();
builder.jobId(jobId);
builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));
final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
// guard against clock changes
final long guardedNetRuntime = Math.max(netRuntime, 0L);
builder.netRuntime(guardedNetRuntime);
builder.accumulatorResults(accessExecutionGraph.getAccumulatorsSerialized());
if (jobStatus != JobStatus.FINISHED) {
final ErrorInfo errorInfo = accessExecutionGraph.getFailureInfo();
if (errorInfo != null) {
builder.serializedThrowable(errorInfo.getException());
}
}
return builder.build();
}
示例30
/**
* Cleanup application and shut down cluster.
*
* @param finalStatus of the Flink application
* @param diagnostics diagnostics message for the Flink application or {@code null}
*/
@Override
public CompletableFuture<Acknowledge> deregisterApplication(
final ApplicationStatus finalStatus,
@Nullable final String diagnostics) {
log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, diagnostics);
try {
internalDeregisterApplication(finalStatus, diagnostics);
} catch (ResourceManagerException e) {
log.warn("Could not properly shutdown the application.", e);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}