Java源码示例:org.apache.flink.runtime.webmonitor.WebMonitorUtils

示例1
/**
 * Tests that we obtain the correct collection of available job details.
 */
@Test
public void testAvailableJobDetails() throws IOException {
	final int numberExecutionGraphs = 10;
	final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs);

	final Collection<JobDetails> jobDetails = executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList());

	final File rootDir = temporaryFolder.newFolder();

	try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
		for (ArchivedExecutionGraph executionGraph : executionGraphs) {
			executionGraphStore.put(executionGraph);
		}

		assertThat(executionGraphStore.getAvailableJobDetails(), Matchers.containsInAnyOrder(jobDetails.toArray()));
	}
}
 
示例2
/**
 * Tests that we obtain the correct collection of available job details.
 */
@Test
public void testAvailableJobDetails() throws IOException {
	final int numberExecutionGraphs = 10;
	final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs);

	final Collection<JobDetails> jobDetails = executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList());

	final File rootDir = temporaryFolder.newFolder();

	try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) {
		for (ArchivedExecutionGraph executionGraph : executionGraphs) {
			executionGraphStore.put(executionGraph);
		}

		assertThat(executionGraphStore.getAvailableJobDetails(), Matchers.containsInAnyOrder(jobDetails.toArray()));
	}
}
 
示例3
static HistoryServerArchivist createHistoryServerArchivist(Configuration configuration, JsonArchivist jsonArchivist) {
	final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);

	if (configuredArchivePath != null) {
		final Path archivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());

		return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath);
	} else {
		return VoidHistoryServerArchivist.INSTANCE;
	}
}
 
示例4
@Nullable
@Override
public JobDetails getAvailableJobDetails(JobID jobId) {
	final ArchivedExecutionGraph archivedExecutionGraph = serializableExecutionGraphs.get(jobId);

	if (archivedExecutionGraph != null) {
		return WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
	} else {
		return null;
	}
}
 
示例5
@Override
public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
	final JobStatus jobStatus = archivedExecutionGraph.getState();
	final JobID jobId = archivedExecutionGraph.getJobID();
	final String jobName = archivedExecutionGraph.getJobName();

	Preconditions.checkArgument(
		jobStatus.isGloballyTerminalState(),
		"The job " + jobName + '(' + jobId +
			") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');

	switch (jobStatus) {
		case FINISHED:
			numFinishedJobs++;
			break;
		case CANCELED:
			numCanceledJobs++;
			break;
		case FAILED:
			numFailedJobs++;
			break;
		default:
			throw new IllegalStateException("The job " + jobName + '(' +
				jobId + ") should have been in a globally terminal state. " +
				"Instead it was in state " + jobStatus + '.');
	}

	// write the ArchivedExecutionGraph to disk
	storeArchivedExecutionGraph(archivedExecutionGraph);

	final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);

	jobDetailsCache.put(jobId, detailsForJob);
	archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
}
 
示例6
@Override
public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
	ResponseBody json = new MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph)));
	String path = getMessageHeaders().getTargetRestEndpointURL()
		.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
	return Collections.singletonList(new ArchivedJson(path, json));
}
 
示例7
static HistoryServerArchivist createHistoryServerArchivist(Configuration configuration, JsonArchivist jsonArchivist) {
	final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);

	if (configuredArchivePath != null) {
		final Path archivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());

		return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath);
	} else {
		return VoidHistoryServerArchivist.INSTANCE;
	}
}
 
示例8
@Nullable
@Override
public JobDetails getAvailableJobDetails(JobID jobId) {
	final ArchivedExecutionGraph archivedExecutionGraph = serializableExecutionGraphs.get(jobId);

	if (archivedExecutionGraph != null) {
		return WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
	} else {
		return null;
	}
}
 
示例9
@Override
public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
	final JobStatus jobStatus = archivedExecutionGraph.getState();
	final JobID jobId = archivedExecutionGraph.getJobID();
	final String jobName = archivedExecutionGraph.getJobName();

	Preconditions.checkArgument(
		jobStatus.isGloballyTerminalState(),
		"The job " + jobName + '(' + jobId +
			") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');

	switch (jobStatus) {
		case FINISHED:
			numFinishedJobs++;
			break;
		case CANCELED:
			numCanceledJobs++;
			break;
		case FAILED:
			numFailedJobs++;
			break;
		default:
			throw new IllegalStateException("The job " + jobName + '(' +
				jobId + ") should have been in a globally terminal state. " +
				"Instead it was in state " + jobStatus + '.');
	}

	// write the ArchivedExecutionGraph to disk
	storeArchivedExecutionGraph(archivedExecutionGraph);

	final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);

	jobDetailsCache.put(jobId, detailsForJob);
	archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
}
 
示例10
@Override
public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
	ResponseBody json = new MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph)));
	String path = getMessageHeaders().getTargetRestEndpointURL()
		.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
	return Collections.singletonList(new ArchivedJson(path, json));
}
 
示例11
static HistoryServerArchivist createHistoryServerArchivist(Configuration configuration, JsonArchivist jsonArchivist, Executor ioExecutor) {
	final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);

	if (configuredArchivePath != null) {
		final Path archivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());

		return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath, ioExecutor);
	} else {
		return VoidHistoryServerArchivist.INSTANCE;
	}
}
 
示例12
@Nullable
@Override
public JobDetails getAvailableJobDetails(JobID jobId) {
	final ArchivedExecutionGraph archivedExecutionGraph = serializableExecutionGraphs.get(jobId);

	if (archivedExecutionGraph != null) {
		return WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
	} else {
		return null;
	}
}
 
示例13
@Override
public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
	final JobStatus jobStatus = archivedExecutionGraph.getState();
	final JobID jobId = archivedExecutionGraph.getJobID();
	final String jobName = archivedExecutionGraph.getJobName();

	Preconditions.checkArgument(
		jobStatus.isGloballyTerminalState(),
		"The job " + jobName + '(' + jobId +
			") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');

	switch (jobStatus) {
		case FINISHED:
			numFinishedJobs++;
			break;
		case CANCELED:
			numCanceledJobs++;
			break;
		case FAILED:
			numFailedJobs++;
			break;
		default:
			throw new IllegalStateException("The job " + jobName + '(' +
				jobId + ") should have been in a globally terminal state. " +
				"Instead it was in state " + jobStatus + '.');
	}

	// write the ArchivedExecutionGraph to disk
	storeArchivedExecutionGraph(archivedExecutionGraph);

	final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);

	jobDetailsCache.put(jobId, detailsForJob);
	archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
}
 
示例14
@Override
public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
	ResponseBody json = new MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph)));
	String path = getMessageHeaders().getTargetRestEndpointURL()
		.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
	return Collections.singletonList(new ArchivedJson(path, json));
}
 
示例15
@Override
public Collection<JobDetails> getAvailableJobDetails() {
	return serializableExecutionGraphs.values().stream()
		.map(WebMonitorUtils::createDetailsForJob)
		.collect(Collectors.toList());
}
 
示例16
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
	List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(localAddressFuture);

	// Add the Dispatcher specific handlers

	final Time timeout = restConfiguration.getTimeout();

	JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
		leaderRetriever,
		timeout,
		responseHeaders,
		executor,
		clusterConfiguration);

	if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
		try {
			webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
				leaderRetriever,
				timeout,
				responseHeaders,
				localAddressFuture,
				uploadDir,
				executor,
				clusterConfiguration);

			// register extension handlers
			handlers.addAll(webSubmissionExtension.getHandlers());
		} catch (FlinkException e) {
			if (log.isDebugEnabled()) {
				log.debug("Failed to load web based job submission extension.", e);
			} else {
				log.info("Failed to load web based job submission extension. " +
					"Probable reason: flink-runtime-web is not in the classpath.");
			}
		}
	} else {
		log.info("Web-based job submission is not enabled.");
	}

	handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));

	return handlers;
}
 
示例17
@Override
public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
	final ExecutionGraph currentExecutionGraph = executionGraph;
	return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(currentExecutionGraph), scheduledExecutorService);
}
 
示例18
@Override
public Collection<JobDetails> getAvailableJobDetails() {
	return serializableExecutionGraphs.values().stream()
		.map(WebMonitorUtils::createDetailsForJob)
		.collect(Collectors.toList());
}
 
示例19
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
	List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(localAddressFuture);

	// Add the Dispatcher specific handlers

	final Time timeout = restConfiguration.getTimeout();

	JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
		leaderRetriever,
		timeout,
		responseHeaders,
		executor,
		clusterConfiguration);

	if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
		try {
			webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
				leaderRetriever,
				timeout,
				responseHeaders,
				localAddressFuture,
				uploadDir,
				executor,
				clusterConfiguration);

			// register extension handlers
			handlers.addAll(webSubmissionExtension.getHandlers());
		} catch (FlinkException e) {
			if (log.isDebugEnabled()) {
				log.debug("Failed to load web based job submission extension.", e);
			} else {
				log.info("Failed to load web based job submission extension. " +
					"Probable reason: flink-runtime-web is not in the classpath.");
			}
		}
	} else {
		log.info("Web-based job submission is not enabled.");
	}

	handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));

	return handlers;
}
 
示例20
@Override
public JobDetails requestJobDetails() {
	mainThreadExecutor.assertRunningInMainThread();
	return WebMonitorUtils.createDetailsForJob(executionGraph);
}
 
示例21
@Override
public Collection<JobDetails> getAvailableJobDetails() {
	return serializableExecutionGraphs.values().stream()
		.map(WebMonitorUtils::createDetailsForJob)
		.collect(Collectors.toList());
}
 
示例22
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
	List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(localAddressFuture);

	// Add the Dispatcher specific handlers

	final Time timeout = restConfiguration.getTimeout();

	JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
		leaderRetriever,
		timeout,
		responseHeaders,
		executor,
		clusterConfiguration);

	if (restConfiguration.isWebSubmitEnabled()) {
		try {
			webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
				leaderRetriever,
				timeout,
				responseHeaders,
				localAddressFuture,
				uploadDir,
				executor,
				clusterConfiguration);

			// register extension handlers
			handlers.addAll(webSubmissionExtension.getHandlers());
		} catch (FlinkException e) {
			if (log.isDebugEnabled()) {
				log.debug("Failed to load web based job submission extension.", e);
			} else {
				log.info("Failed to load web based job submission extension. " +
					"Probable reason: flink-runtime-web is not in the classpath.");
			}
		}
	} else {
		log.info("Web-based job submission is not enabled.");
	}

	handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));

	return handlers;
}
 
示例23
@Override
public JobDetails requestJobDetails() {
	mainThreadExecutor.assertRunningInMainThread();
	return WebMonitorUtils.createDetailsForJob(executionGraph);
}
 
示例24
private static Collection<JobDetails> generateJobDetails(Collection<ArchivedExecutionGraph> executionGraphs) {
	return executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList());
}