Java源码示例:scala.concurrent.duration.Deadline

示例1
protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
	// submit job
	final JobGraph jobGraph = getJobGraph(plan);

	ClusterClient<?> client = CLUSTER.getClusterClient();
	client.setDetached(true);

	JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, CancelingTestBase.class.getClassLoader());

	Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();

	JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
	while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
		Thread.sleep(50);
		jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
	}
	if (jobStatus != JobStatus.RUNNING) {
		Assert.fail("Job not in state RUNNING.");
	}

	Thread.sleep(msecsTillCanceling);

	client.cancel(jobSubmissionResult.getJobID());

	Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();

	JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
	while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
		Thread.sleep(50);
		jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
	}
	if (jobStatusAfterCancel != JobStatus.CANCELED) {
		Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
	}
}
 
示例2
private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
	FutureUtils.retrySuccessfulWithDelay(
		() -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
		Time.milliseconds(50L),
		org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
		clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
		new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
		.get();
}
 
示例3
protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
	// submit job
	final JobGraph jobGraph = getJobGraph(plan);

	ClusterClient<?> client = CLUSTER.getClusterClient();
	client.setDetached(true);

	JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, CancelingTestBase.class.getClassLoader());

	Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();

	JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
	while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
		Thread.sleep(50);
		jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
	}
	if (jobStatus != JobStatus.RUNNING) {
		Assert.fail("Job not in state RUNNING.");
	}

	Thread.sleep(msecsTillCanceling);

	client.cancel(jobSubmissionResult.getJobID());

	Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();

	JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
	while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
		Thread.sleep(50);
		jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
	}
	if (jobStatusAfterCancel != JobStatus.CANCELED) {
		Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
	}
}
 
示例4
private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
	FutureUtils.retrySuccessfulWithDelay(
		() -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
		Time.milliseconds(50L),
		org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
		clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
		new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
		.get();
}
 
示例5
protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
	// submit job
	final JobGraph jobGraph = getJobGraph(plan);

	final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();

	ClusterClient<?> client = CLUSTER.getClusterClient();
	JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);

	Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();

	JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
	while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
		Thread.sleep(50);
		jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
	}
	if (jobStatus != JobStatus.RUNNING) {
		Assert.fail("Job not in state RUNNING.");
	}

	Thread.sleep(msecsTillCanceling);

	client.cancel(jobSubmissionResult.getJobID()).get();

	Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();

	JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
	while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
		Thread.sleep(50);
		jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
	}
	if (jobStatusAfterCancel != JobStatus.CANCELED) {
		Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
	}
}
 
示例6
/**
 * Tests disposal of a savepoint, which contains custom user code KvState.
 */
@Test
public void testDisposeSavepointWithCustomKvState() throws Exception {
	ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), miniClusterResource.getMiniCluster());

	Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();

	File checkpointDir = FOLDER.newFolder();
	File outputDir = FOLDER.newFolder();

	final PackagedProgram program = new PackagedProgram(
			new File(CUSTOM_KV_STATE_JAR_PATH),
			new String[] {
					String.valueOf(parallelism),
					checkpointDir.toURI().toString(),
					"5000",
					outputDir.toURI().toString()
			});

	TestStreamEnvironment.setAsContext(
		miniClusterResource.getMiniCluster(),
		parallelism,
		Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
		Collections.<URL>emptyList()
	);

	// Execute detached
	Thread invokeThread = new Thread(new Runnable() {
		@Override
		public void run() {
			try {
				program.invokeInteractiveModeForExecution();
			} catch (ProgramInvocationException ignored) {
				if (ignored.getCause() == null ||
					!(ignored.getCause() instanceof JobCancellationException)) {
					ignored.printStackTrace();
				}
			}
		}
	});

	LOG.info("Starting program invoke thread");
	invokeThread.start();

	// The job ID
	JobID jobId = null;

	LOG.info("Waiting for job status running.");

	// Wait for running job
	while (jobId == null && deadline.hasTimeLeft()) {

		Collection<JobStatusMessage> jobs = clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
		for (JobStatusMessage job : jobs) {
			if (job.getJobState() == JobStatus.RUNNING) {
				jobId = job.getJobId();
				LOG.info("Job running. ID: " + jobId);
				break;
			}
		}

		// Retry if job is not available yet
		if (jobId == null) {
			Thread.sleep(100L);
		}
	}

	// Trigger savepoint
	String savepointPath = null;
	for (int i = 0; i < 20; i++) {
		LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
		try {
			savepointPath = clusterClient.triggerSavepoint(jobId, null)
				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
		} catch (Exception cause) {
			LOG.info("Failed to trigger savepoint. Retrying...", cause);
			// This can fail if the operators are not opened yet
			Thread.sleep(500);
		}
	}

	assertNotNull("Failed to trigger savepoint", savepointPath);

	clusterClient.disposeSavepoint(savepointPath).get();

	clusterClient.cancel(jobId);

	// make sure, the execution is finished to not influence other test methods
	invokeThread.join(deadline.timeLeft().toMillis());
	assertFalse("Program invoke thread still running", invokeThread.isAlive());
}
 
示例7
@Test
public void testStopYarn() throws Exception {
	// this only works if there is no active job at this point
	assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());

	// Create a task
	final JobVertex sender = new JobVertex("Sender");
	sender.setParallelism(2);
	sender.setInvokableClass(BlockingInvokable.class);

	final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
	final JobID jid = jobGraph.getJobID();

	ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
	clusterClient.setDetached(true);
	clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());

	// wait for job to show up
	while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
		Thread.sleep(10);
	}

	// wait for tasks to be properly running
	BlockingInvokable.latch.await();

	final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
	final Deadline deadline = testTimeout.fromNow();

	try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) {
		// Request the file from the web server
		client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());

		HttpTestClient.SimpleHttpResponse response = client
			.getNextResponse(deadline.timeLeft());

		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
		assertEquals("application/json; charset=UTF-8", response.getType());
		assertEquals("{}", response.getContent());
	}

	// wait for cancellation to finish
	while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
		Thread.sleep(20);
	}

	BlockingInvokable.reset();
}
 
示例8
/**
 * Tests disposal of a savepoint, which contains custom user code KvState.
 */
@Test
public void testDisposeSavepointWithCustomKvState() throws Exception {
	ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), miniClusterResource.getMiniCluster());

	Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();

	File checkpointDir = FOLDER.newFolder();
	File outputDir = FOLDER.newFolder();

	final PackagedProgram program = new PackagedProgram(
			new File(CUSTOM_KV_STATE_JAR_PATH),
			new String[] {
					String.valueOf(parallelism),
					checkpointDir.toURI().toString(),
					"5000",
					outputDir.toURI().toString()
			});

	TestStreamEnvironment.setAsContext(
		miniClusterResource.getMiniCluster(),
		parallelism,
		Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
		Collections.<URL>emptyList()
	);

	// Execute detached
	Thread invokeThread = new Thread(new Runnable() {
		@Override
		public void run() {
			try {
				program.invokeInteractiveModeForExecution();
			} catch (ProgramInvocationException ignored) {
				if (ignored.getCause() == null ||
					!(ignored.getCause() instanceof JobCancellationException)) {
					ignored.printStackTrace();
				}
			}
		}
	});

	LOG.info("Starting program invoke thread");
	invokeThread.start();

	// The job ID
	JobID jobId = null;

	LOG.info("Waiting for job status running.");

	// Wait for running job
	while (jobId == null && deadline.hasTimeLeft()) {

		Collection<JobStatusMessage> jobs = clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
		for (JobStatusMessage job : jobs) {
			if (job.getJobState() == JobStatus.RUNNING) {
				jobId = job.getJobId();
				LOG.info("Job running. ID: " + jobId);
				break;
			}
		}

		// Retry if job is not available yet
		if (jobId == null) {
			Thread.sleep(100L);
		}
	}

	// Trigger savepoint
	String savepointPath = null;
	for (int i = 0; i < 20; i++) {
		LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
		try {
			savepointPath = clusterClient.triggerSavepoint(jobId, null)
				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
		} catch (Exception cause) {
			LOG.info("Failed to trigger savepoint. Retrying...", cause);
			// This can fail if the operators are not opened yet
			Thread.sleep(500);
		}
	}

	assertNotNull("Failed to trigger savepoint", savepointPath);

	clusterClient.disposeSavepoint(savepointPath).get();

	clusterClient.cancel(jobId);

	// make sure, the execution is finished to not influence other test methods
	invokeThread.join(deadline.timeLeft().toMillis());
	assertFalse("Program invoke thread still running", invokeThread.isAlive());
}
 
示例9
@Test
public void testCancelYarn() throws Exception {
	// this only works if there is no active job at this point
	assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());

	// Create a task
	final JobVertex sender = new JobVertex("Sender");
	sender.setParallelism(2);
	sender.setInvokableClass(BlockingInvokable.class);

	final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
	final JobID jid = jobGraph.getJobID();

	ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
	clusterClient.setDetached(true);
	clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());

	// wait for job to show up
	while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
		Thread.sleep(10);
	}

	// wait for tasks to be properly running
	BlockingInvokable.latch.await();

	final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
	final Deadline deadline = testTimeout.fromNow();

	try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) {
		// Request the file from the web server
		client.sendGetRequest("/jobs/" + jid + "/yarn-cancel", deadline.timeLeft());

		HttpTestClient.SimpleHttpResponse response = client
			.getNextResponse(deadline.timeLeft());

		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
		assertEquals("application/json; charset=UTF-8", response.getType());
		assertEquals("{}", response.getContent());
	}

	// wait for cancellation to finish
	while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
		Thread.sleep(20);
	}

	BlockingInvokable.reset();
}
 
示例10
@Test
public void amqTopologyWithCheckpointing() throws Exception {
    ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
    AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
        .setConnectionFactory(connectionFactory)
        .setDestinationName("queue2")
        .setSerializationSchema(new SimpleStringSchema())
        .build();
    AMQSink<String> sink = new AMQSink<>(sinkConfig);
    sink.open(new Configuration());

    for (int i = 0; i < MESSAGES_NUM; i++) {
        sink.invoke("amq-" + i, null);
    }

    AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
        .setConnectionFactory(connectionFactory)
        .setDestinationName("queue2")
        .setDeserializationSchema(new SimpleStringSchema())
        .build();

    final AMQSource<String> source = new AMQSource<>(sourceConfig);
    RuntimeContext runtimeContext = createMockRuntimeContext();
    source.setRuntimeContext(runtimeContext);
    source.open(new Configuration());

    final TestSourceContext sourceContext = new TestSourceContext();
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                source.run(sourceContext);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });
    thread.start();

    Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    while (deadline.hasTimeLeft() && sourceContext.getIdsNum() < MESSAGES_NUM) {
        Thread.sleep(100);
        Random random = new Random();
        final long checkpointId = random.nextLong();
        synchronized (sourceContext.getCheckpointLock()) {
            source.snapshotState(new FunctionSnapshotContext() {
                @Override
                public long getCheckpointId() {
                    return checkpointId;
                }

                @Override
                public long getCheckpointTimestamp() {
                    return System.currentTimeMillis();
                }
            });
            source.notifyCheckpointComplete(checkpointId);
        }
    }
    assertEquals(MESSAGES_NUM, sourceContext.getIdsNum());
}
 
示例11
/**
 * Tests disposal of a savepoint, which contains custom user code KvState.
 */
@Test
public void testDisposeSavepointWithCustomKvState() throws Exception {
	ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), miniClusterResource.getMiniCluster());

	Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();

	File checkpointDir = FOLDER.newFolder();
	File outputDir = FOLDER.newFolder();

	final PackagedProgram program = PackagedProgram.newBuilder()
		.setJarFile(new File(CUSTOM_KV_STATE_JAR_PATH))
		.setArguments(new String[] {
			String.valueOf(parallelism),
			checkpointDir.toURI().toString(),
			"5000",
			outputDir.toURI().toString(),
			"false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
		})
		.build();

	TestStreamEnvironment.setAsContext(
		miniClusterResource.getMiniCluster(),
		parallelism,
		Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
		Collections.emptyList()
	);

	// Execute detached
	Thread invokeThread = new Thread(() -> {
		try {
			program.invokeInteractiveModeForExecution();
		} catch (ProgramInvocationException ex) {
			if (ex.getCause() == null ||
				!(ex.getCause() instanceof JobCancellationException)) {
				ex.printStackTrace();
			}
		}
	});

	LOG.info("Starting program invoke thread");
	invokeThread.start();

	// The job ID
	JobID jobId = null;

	LOG.info("Waiting for job status running.");

	// Wait for running job
	while (jobId == null && deadline.hasTimeLeft()) {

		Collection<JobStatusMessage> jobs = clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
		for (JobStatusMessage job : jobs) {
			if (job.getJobState() == JobStatus.RUNNING) {
				jobId = job.getJobId();
				LOG.info("Job running. ID: " + jobId);
				break;
			}
		}

		// Retry if job is not available yet
		if (jobId == null) {
			Thread.sleep(100L);
		}
	}

	// Trigger savepoint
	String savepointPath = null;
	for (int i = 0; i < 20; i++) {
		LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
		try {
			savepointPath = clusterClient.triggerSavepoint(jobId, null)
				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
		} catch (Exception cause) {
			LOG.info("Failed to trigger savepoint. Retrying...", cause);
			// This can fail if the operators are not opened yet
			Thread.sleep(500);
		}
	}

	assertNotNull("Failed to trigger savepoint", savepointPath);

	clusterClient.disposeSavepoint(savepointPath).get();

	clusterClient.cancel(jobId).get();

	// make sure, the execution is finished to not influence other test methods
	invokeThread.join(deadline.timeLeft().toMillis());
	assertFalse("Program invoke thread still running", invokeThread.isAlive());
}
 
示例12
/**
 * Tests that the stream operator can snapshot and restore the operator state of chained
 * operators.
 */
@Test
public void testSnapshottingAndRestoring() throws Exception {
	final Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();

	final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
			OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

	testHarness.setupOutputForSingletonOperatorChain();

	IdentityKeySelector<String> keySelector = new IdentityKeySelector<>();
	testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);

	long checkpointId = 1L;
	long checkpointTimestamp = 1L;
	int numberChainedTasks = 11;

	StreamConfig streamConfig = testHarness.getStreamConfig();

	configureChainedTestingStreamOperator(streamConfig, numberChainedTasks);
	TestTaskStateManager taskStateManager = testHarness.taskStateManager;
	OneShotLatch waitForAcknowledgeLatch = new OneShotLatch();

	taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);

	// reset number of restore calls
	TestingStreamOperator.numberRestoreCalls = 0;

	testHarness.invoke();
	testHarness.waitForTaskRunning(deadline.timeLeft().toMillis());

	final OneInputStreamTask<String, String> streamTask = testHarness.getTask();

	CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);

	while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())) {}

	// since no state was set, there shouldn't be restore calls
	assertEquals(0, TestingStreamOperator.numberRestoreCalls);

	waitForAcknowledgeLatch.await();

	assertEquals(checkpointId, taskStateManager.getReportedCheckpointId());

	testHarness.endInput();
	testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());

	final OneInputStreamTaskTestHarness<String, String> restoredTaskHarness =
			new OneInputStreamTaskTestHarness<>(
					OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

	restoredTaskHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);

	restoredTaskHarness.setTaskStateSnapshot(checkpointId, taskStateManager.getLastJobManagerTaskStateSnapshot());

	StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig();

	configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks);

	TaskStateSnapshot stateHandles = taskStateManager.getLastJobManagerTaskStateSnapshot();
	Assert.assertEquals(numberChainedTasks, stateHandles.getSubtaskStateMappings().size());

	TestingStreamOperator.numberRestoreCalls = 0;

	// transfer state to new harness
	restoredTaskHarness.taskStateManager.restoreLatestCheckpointState(
		taskStateManager.getJobManagerTaskStateSnapshotsByCheckpointId());
	restoredTaskHarness.invoke();
	restoredTaskHarness.endInput();
	restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());

	// restore of every chained operator should have been called
	assertEquals(numberChainedTasks, TestingStreamOperator.numberRestoreCalls);

	TestingStreamOperator.numberRestoreCalls = 0;
	TestingStreamOperator.numberSnapshotCalls = 0;
}
 
示例13
/**
 * Tests that the stream operator can snapshot and restore the operator state of chained
 * operators.
 */
@Test
public void testSnapshottingAndRestoring() throws Exception {
	final Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();

	final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
			OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

	testHarness.setupOutputForSingletonOperatorChain();

	IdentityKeySelector<String> keySelector = new IdentityKeySelector<>();
	testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);

	long checkpointId = 1L;
	long checkpointTimestamp = 1L;
	int numberChainedTasks = 11;

	StreamConfig streamConfig = testHarness.getStreamConfig();

	configureChainedTestingStreamOperator(streamConfig, numberChainedTasks);
	TestTaskStateManager taskStateManager = testHarness.taskStateManager;
	OneShotLatch waitForAcknowledgeLatch = new OneShotLatch();

	taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);

	// reset number of restore calls
	TestingStreamOperator.numberRestoreCalls = 0;

	testHarness.invoke();
	testHarness.waitForTaskRunning(deadline.timeLeft().toMillis());

	final OneInputStreamTask<String, String> streamTask = testHarness.getTask();

	CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);

	while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false)) {}

	// since no state was set, there shouldn't be restore calls
	assertEquals(0, TestingStreamOperator.numberRestoreCalls);

	waitForAcknowledgeLatch.await();

	assertEquals(checkpointId, taskStateManager.getReportedCheckpointId());

	testHarness.endInput();
	testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());

	final OneInputStreamTaskTestHarness<String, String> restoredTaskHarness =
			new OneInputStreamTaskTestHarness<>(
					OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

	restoredTaskHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);

	restoredTaskHarness.setTaskStateSnapshot(checkpointId, taskStateManager.getLastJobManagerTaskStateSnapshot());

	StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig();

	configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks);

	TaskStateSnapshot stateHandles = taskStateManager.getLastJobManagerTaskStateSnapshot();
	Assert.assertEquals(numberChainedTasks, stateHandles.getSubtaskStateMappings().size());

	TestingStreamOperator.numberRestoreCalls = 0;

	// transfer state to new harness
	restoredTaskHarness.taskStateManager.restoreLatestCheckpointState(
		taskStateManager.getJobManagerTaskStateSnapshotsByCheckpointId());
	restoredTaskHarness.invoke();
	restoredTaskHarness.endInput();
	restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());

	// restore of every chained operator should have been called
	assertEquals(numberChainedTasks, TestingStreamOperator.numberRestoreCalls);

	TestingStreamOperator.numberRestoreCalls = 0;
	TestingStreamOperator.numberSnapshotCalls = 0;
}
 
示例14
/**
 * Tests that the stream operator can snapshot and restore the operator state of chained
 * operators.
 */
@Test
public void testSnapshottingAndRestoring() throws Exception {
	final Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();

	final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
			OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

	testHarness.setupOutputForSingletonOperatorChain();

	IdentityKeySelector<String> keySelector = new IdentityKeySelector<>();
	testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);

	long checkpointId = 1L;
	long checkpointTimestamp = 1L;
	int numberChainedTasks = 11;

	StreamConfig streamConfig = testHarness.getStreamConfig();

	configureChainedTestingStreamOperator(streamConfig, numberChainedTasks);
	TestTaskStateManager taskStateManager = testHarness.taskStateManager;
	OneShotLatch waitForAcknowledgeLatch = new OneShotLatch();

	taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);

	// reset number of restore calls
	TestingStreamOperator.numberRestoreCalls = 0;

	testHarness.invoke();
	testHarness.waitForTaskRunning();

	final OneInputStreamTask<String, String> streamTask = testHarness.getTask();

	CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);

	streamTask.triggerCheckpointAsync(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false).get();

	// since no state was set, there shouldn't be restore calls
	assertEquals(0, TestingStreamOperator.numberRestoreCalls);

	waitForAcknowledgeLatch.await();

	assertEquals(checkpointId, taskStateManager.getReportedCheckpointId());

	testHarness.endInput();
	testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());

	final OneInputStreamTaskTestHarness<String, String> restoredTaskHarness =
			new OneInputStreamTaskTestHarness<>(
					OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

	restoredTaskHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);

	restoredTaskHarness.setTaskStateSnapshot(checkpointId, taskStateManager.getLastJobManagerTaskStateSnapshot());

	StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig();

	configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks);

	TaskStateSnapshot stateHandles = taskStateManager.getLastJobManagerTaskStateSnapshot();
	Assert.assertEquals(numberChainedTasks, stateHandles.getSubtaskStateMappings().size());

	TestingStreamOperator.numberRestoreCalls = 0;

	// transfer state to new harness
	restoredTaskHarness.taskStateManager.restoreLatestCheckpointState(
		taskStateManager.getJobManagerTaskStateSnapshotsByCheckpointId());
	restoredTaskHarness.invoke();
	restoredTaskHarness.endInput();
	restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());

	// restore of every chained operator should have been called
	assertEquals(numberChainedTasks, TestingStreamOperator.numberRestoreCalls);

	TestingStreamOperator.numberRestoreCalls = 0;
	TestingStreamOperator.numberSnapshotCalls = 0;
}