Java源码示例:org.apache.flink.runtime.webmonitor.WebMonitorUtils
示例1
/**
* Tests that we obtain the correct collection of available job details.
*/
@Test
public void testAvailableJobDetails() throws IOException {
final int numberExecutionGraphs = 10;
final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs);
final Collection<JobDetails> jobDetails = executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList());
final File rootDir = temporaryFolder.newFolder();
try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
for (ArchivedExecutionGraph executionGraph : executionGraphs) {
executionGraphStore.put(executionGraph);
}
assertThat(executionGraphStore.getAvailableJobDetails(), Matchers.containsInAnyOrder(jobDetails.toArray()));
}
}
示例2
/**
* Tests that we obtain the correct collection of available job details.
*/
@Test
public void testAvailableJobDetails() throws IOException {
final int numberExecutionGraphs = 10;
final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs);
final Collection<JobDetails> jobDetails = executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList());
final File rootDir = temporaryFolder.newFolder();
try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
for (ArchivedExecutionGraph executionGraph : executionGraphs) {
executionGraphStore.put(executionGraph);
}
assertThat(executionGraphStore.getAvailableJobDetails(), Matchers.containsInAnyOrder(jobDetails.toArray()));
}
}
示例3
static HistoryServerArchivist createHistoryServerArchivist(Configuration configuration, JsonArchivist jsonArchivist) {
final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
if (configuredArchivePath != null) {
final Path archivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath);
} else {
return VoidHistoryServerArchivist.INSTANCE;
}
}
示例4
@Nullable
@Override
public JobDetails getAvailableJobDetails(JobID jobId) {
final ArchivedExecutionGraph archivedExecutionGraph = serializableExecutionGraphs.get(jobId);
if (archivedExecutionGraph != null) {
return WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
} else {
return null;
}
}
示例5
@Override
public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
final JobStatus jobStatus = archivedExecutionGraph.getState();
final JobID jobId = archivedExecutionGraph.getJobID();
final String jobName = archivedExecutionGraph.getJobName();
Preconditions.checkArgument(
jobStatus.isGloballyTerminalState(),
"The job " + jobName + '(' + jobId +
") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');
switch (jobStatus) {
case FINISHED:
numFinishedJobs++;
break;
case CANCELED:
numCanceledJobs++;
break;
case FAILED:
numFailedJobs++;
break;
default:
throw new IllegalStateException("The job " + jobName + '(' +
jobId + ") should have been in a globally terminal state. " +
"Instead it was in state " + jobStatus + '.');
}
// write the ArchivedExecutionGraph to disk
storeArchivedExecutionGraph(archivedExecutionGraph);
final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
jobDetailsCache.put(jobId, detailsForJob);
archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
}
示例6
@Override
public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
ResponseBody json = new MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph)));
String path = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
return Collections.singletonList(new ArchivedJson(path, json));
}
示例7
static HistoryServerArchivist createHistoryServerArchivist(Configuration configuration, JsonArchivist jsonArchivist) {
final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
if (configuredArchivePath != null) {
final Path archivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath);
} else {
return VoidHistoryServerArchivist.INSTANCE;
}
}
示例8
@Nullable
@Override
public JobDetails getAvailableJobDetails(JobID jobId) {
final ArchivedExecutionGraph archivedExecutionGraph = serializableExecutionGraphs.get(jobId);
if (archivedExecutionGraph != null) {
return WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
} else {
return null;
}
}
示例9
@Override
public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
final JobStatus jobStatus = archivedExecutionGraph.getState();
final JobID jobId = archivedExecutionGraph.getJobID();
final String jobName = archivedExecutionGraph.getJobName();
Preconditions.checkArgument(
jobStatus.isGloballyTerminalState(),
"The job " + jobName + '(' + jobId +
") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');
switch (jobStatus) {
case FINISHED:
numFinishedJobs++;
break;
case CANCELED:
numCanceledJobs++;
break;
case FAILED:
numFailedJobs++;
break;
default:
throw new IllegalStateException("The job " + jobName + '(' +
jobId + ") should have been in a globally terminal state. " +
"Instead it was in state " + jobStatus + '.');
}
// write the ArchivedExecutionGraph to disk
storeArchivedExecutionGraph(archivedExecutionGraph);
final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
jobDetailsCache.put(jobId, detailsForJob);
archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
}
示例10
@Override
public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
ResponseBody json = new MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph)));
String path = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
return Collections.singletonList(new ArchivedJson(path, json));
}
示例11
static HistoryServerArchivist createHistoryServerArchivist(Configuration configuration, JsonArchivist jsonArchivist, Executor ioExecutor) {
final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
if (configuredArchivePath != null) {
final Path archivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath, ioExecutor);
} else {
return VoidHistoryServerArchivist.INSTANCE;
}
}
示例12
@Nullable
@Override
public JobDetails getAvailableJobDetails(JobID jobId) {
final ArchivedExecutionGraph archivedExecutionGraph = serializableExecutionGraphs.get(jobId);
if (archivedExecutionGraph != null) {
return WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
} else {
return null;
}
}
示例13
@Override
public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
final JobStatus jobStatus = archivedExecutionGraph.getState();
final JobID jobId = archivedExecutionGraph.getJobID();
final String jobName = archivedExecutionGraph.getJobName();
Preconditions.checkArgument(
jobStatus.isGloballyTerminalState(),
"The job " + jobName + '(' + jobId +
") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');
switch (jobStatus) {
case FINISHED:
numFinishedJobs++;
break;
case CANCELED:
numCanceledJobs++;
break;
case FAILED:
numFailedJobs++;
break;
default:
throw new IllegalStateException("The job " + jobName + '(' +
jobId + ") should have been in a globally terminal state. " +
"Instead it was in state " + jobStatus + '.');
}
// write the ArchivedExecutionGraph to disk
storeArchivedExecutionGraph(archivedExecutionGraph);
final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
jobDetailsCache.put(jobId, detailsForJob);
archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
}
示例14
@Override
public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
ResponseBody json = new MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph)));
String path = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
return Collections.singletonList(new ArchivedJson(path, json));
}
示例15
@Override
public Collection<JobDetails> getAvailableJobDetails() {
return serializableExecutionGraphs.values().stream()
.map(WebMonitorUtils::createDetailsForJob)
.collect(Collectors.toList());
}
示例16
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(localAddressFuture);
// Add the Dispatcher specific handlers
final Time timeout = restConfiguration.getTimeout();
JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
leaderRetriever,
timeout,
responseHeaders,
executor,
clusterConfiguration);
if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
try {
webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
leaderRetriever,
timeout,
responseHeaders,
localAddressFuture,
uploadDir,
executor,
clusterConfiguration);
// register extension handlers
handlers.addAll(webSubmissionExtension.getHandlers());
} catch (FlinkException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to load web based job submission extension.", e);
} else {
log.info("Failed to load web based job submission extension. " +
"Probable reason: flink-runtime-web is not in the classpath.");
}
}
} else {
log.info("Web-based job submission is not enabled.");
}
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
return handlers;
}
示例17
@Override
public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
final ExecutionGraph currentExecutionGraph = executionGraph;
return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(currentExecutionGraph), scheduledExecutorService);
}
示例18
@Override
public Collection<JobDetails> getAvailableJobDetails() {
return serializableExecutionGraphs.values().stream()
.map(WebMonitorUtils::createDetailsForJob)
.collect(Collectors.toList());
}
示例19
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(localAddressFuture);
// Add the Dispatcher specific handlers
final Time timeout = restConfiguration.getTimeout();
JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
leaderRetriever,
timeout,
responseHeaders,
executor,
clusterConfiguration);
if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
try {
webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
leaderRetriever,
timeout,
responseHeaders,
localAddressFuture,
uploadDir,
executor,
clusterConfiguration);
// register extension handlers
handlers.addAll(webSubmissionExtension.getHandlers());
} catch (FlinkException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to load web based job submission extension.", e);
} else {
log.info("Failed to load web based job submission extension. " +
"Probable reason: flink-runtime-web is not in the classpath.");
}
}
} else {
log.info("Web-based job submission is not enabled.");
}
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
return handlers;
}
示例20
@Override
public JobDetails requestJobDetails() {
mainThreadExecutor.assertRunningInMainThread();
return WebMonitorUtils.createDetailsForJob(executionGraph);
}
示例21
@Override
public Collection<JobDetails> getAvailableJobDetails() {
return serializableExecutionGraphs.values().stream()
.map(WebMonitorUtils::createDetailsForJob)
.collect(Collectors.toList());
}
示例22
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(localAddressFuture);
// Add the Dispatcher specific handlers
final Time timeout = restConfiguration.getTimeout();
JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
leaderRetriever,
timeout,
responseHeaders,
executor,
clusterConfiguration);
if (restConfiguration.isWebSubmitEnabled()) {
try {
webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
leaderRetriever,
timeout,
responseHeaders,
localAddressFuture,
uploadDir,
executor,
clusterConfiguration);
// register extension handlers
handlers.addAll(webSubmissionExtension.getHandlers());
} catch (FlinkException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to load web based job submission extension.", e);
} else {
log.info("Failed to load web based job submission extension. " +
"Probable reason: flink-runtime-web is not in the classpath.");
}
}
} else {
log.info("Web-based job submission is not enabled.");
}
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
return handlers;
}
示例23
@Override
public JobDetails requestJobDetails() {
mainThreadExecutor.assertRunningInMainThread();
return WebMonitorUtils.createDetailsForJob(executionGraph);
}
示例24
private static Collection<JobDetails> generateJobDetails(Collection<ArchivedExecutionGraph> executionGraphs) {
return executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList());
}