Java源码示例:scala.concurrent.duration.Deadline
示例1
protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
// submit job
final JobGraph jobGraph = getJobGraph(plan);
ClusterClient<?> client = CLUSTER.getClusterClient();
client.setDetached(true);
JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, CancelingTestBase.class.getClassLoader());
Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
Thread.sleep(50);
jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
}
if (jobStatus != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
}
Thread.sleep(msecsTillCanceling);
client.cancel(jobSubmissionResult.getJobID());
Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
Thread.sleep(50);
jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
}
if (jobStatusAfterCancel != JobStatus.CANCELED) {
Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
}
}
示例2
private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
FutureUtils.retrySuccessfulWithDelay(
() -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
Time.milliseconds(50L),
org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
.get();
}
示例3
protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
// submit job
final JobGraph jobGraph = getJobGraph(plan);
ClusterClient<?> client = CLUSTER.getClusterClient();
client.setDetached(true);
JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, CancelingTestBase.class.getClassLoader());
Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
Thread.sleep(50);
jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
}
if (jobStatus != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
}
Thread.sleep(msecsTillCanceling);
client.cancel(jobSubmissionResult.getJobID());
Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
Thread.sleep(50);
jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
}
if (jobStatusAfterCancel != JobStatus.CANCELED) {
Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
}
}
示例4
private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
FutureUtils.retrySuccessfulWithDelay(
() -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
Time.milliseconds(50L),
org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
.get();
}
示例5
protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
// submit job
final JobGraph jobGraph = getJobGraph(plan);
final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();
ClusterClient<?> client = CLUSTER.getClusterClient();
JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);
Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
Thread.sleep(50);
jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
}
if (jobStatus != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
}
Thread.sleep(msecsTillCanceling);
client.cancel(jobSubmissionResult.getJobID()).get();
Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
Thread.sleep(50);
jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
}
if (jobStatusAfterCancel != JobStatus.CANCELED) {
Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
}
}
示例6
/**
* Tests disposal of a savepoint, which contains custom user code KvState.
*/
@Test
public void testDisposeSavepointWithCustomKvState() throws Exception {
ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), miniClusterResource.getMiniCluster());
Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
File checkpointDir = FOLDER.newFolder();
File outputDir = FOLDER.newFolder();
final PackagedProgram program = new PackagedProgram(
new File(CUSTOM_KV_STATE_JAR_PATH),
new String[] {
String.valueOf(parallelism),
checkpointDir.toURI().toString(),
"5000",
outputDir.toURI().toString()
});
TestStreamEnvironment.setAsContext(
miniClusterResource.getMiniCluster(),
parallelism,
Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
Collections.<URL>emptyList()
);
// Execute detached
Thread invokeThread = new Thread(new Runnable() {
@Override
public void run() {
try {
program.invokeInteractiveModeForExecution();
} catch (ProgramInvocationException ignored) {
if (ignored.getCause() == null ||
!(ignored.getCause() instanceof JobCancellationException)) {
ignored.printStackTrace();
}
}
}
});
LOG.info("Starting program invoke thread");
invokeThread.start();
// The job ID
JobID jobId = null;
LOG.info("Waiting for job status running.");
// Wait for running job
while (jobId == null && deadline.hasTimeLeft()) {
Collection<JobStatusMessage> jobs = clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
for (JobStatusMessage job : jobs) {
if (job.getJobState() == JobStatus.RUNNING) {
jobId = job.getJobId();
LOG.info("Job running. ID: " + jobId);
break;
}
}
// Retry if job is not available yet
if (jobId == null) {
Thread.sleep(100L);
}
}
// Trigger savepoint
String savepointPath = null;
for (int i = 0; i < 20; i++) {
LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
try {
savepointPath = clusterClient.triggerSavepoint(jobId, null)
.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception cause) {
LOG.info("Failed to trigger savepoint. Retrying...", cause);
// This can fail if the operators are not opened yet
Thread.sleep(500);
}
}
assertNotNull("Failed to trigger savepoint", savepointPath);
clusterClient.disposeSavepoint(savepointPath).get();
clusterClient.cancel(jobId);
// make sure, the execution is finished to not influence other test methods
invokeThread.join(deadline.timeLeft().toMillis());
assertFalse("Program invoke thread still running", invokeThread.isAlive());
}
示例7
@Test
public void testStopYarn() throws Exception {
// this only works if there is no active job at this point
assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(2);
sender.setInvokableClass(BlockingInvokable.class);
final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
final JobID jid = jobGraph.getJobID();
ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
clusterClient.setDetached(true);
clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());
// wait for job to show up
while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(10);
}
// wait for tasks to be properly running
BlockingInvokable.latch.await();
final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
final Deadline deadline = testTimeout.fromNow();
try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) {
// Request the file from the web server
client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = client
.getNextResponse(deadline.timeLeft());
assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.getType());
assertEquals("{}", response.getContent());
}
// wait for cancellation to finish
while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(20);
}
BlockingInvokable.reset();
}
示例8
/**
* Tests disposal of a savepoint, which contains custom user code KvState.
*/
@Test
public void testDisposeSavepointWithCustomKvState() throws Exception {
ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), miniClusterResource.getMiniCluster());
Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
File checkpointDir = FOLDER.newFolder();
File outputDir = FOLDER.newFolder();
final PackagedProgram program = new PackagedProgram(
new File(CUSTOM_KV_STATE_JAR_PATH),
new String[] {
String.valueOf(parallelism),
checkpointDir.toURI().toString(),
"5000",
outputDir.toURI().toString()
});
TestStreamEnvironment.setAsContext(
miniClusterResource.getMiniCluster(),
parallelism,
Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
Collections.<URL>emptyList()
);
// Execute detached
Thread invokeThread = new Thread(new Runnable() {
@Override
public void run() {
try {
program.invokeInteractiveModeForExecution();
} catch (ProgramInvocationException ignored) {
if (ignored.getCause() == null ||
!(ignored.getCause() instanceof JobCancellationException)) {
ignored.printStackTrace();
}
}
}
});
LOG.info("Starting program invoke thread");
invokeThread.start();
// The job ID
JobID jobId = null;
LOG.info("Waiting for job status running.");
// Wait for running job
while (jobId == null && deadline.hasTimeLeft()) {
Collection<JobStatusMessage> jobs = clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
for (JobStatusMessage job : jobs) {
if (job.getJobState() == JobStatus.RUNNING) {
jobId = job.getJobId();
LOG.info("Job running. ID: " + jobId);
break;
}
}
// Retry if job is not available yet
if (jobId == null) {
Thread.sleep(100L);
}
}
// Trigger savepoint
String savepointPath = null;
for (int i = 0; i < 20; i++) {
LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
try {
savepointPath = clusterClient.triggerSavepoint(jobId, null)
.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception cause) {
LOG.info("Failed to trigger savepoint. Retrying...", cause);
// This can fail if the operators are not opened yet
Thread.sleep(500);
}
}
assertNotNull("Failed to trigger savepoint", savepointPath);
clusterClient.disposeSavepoint(savepointPath).get();
clusterClient.cancel(jobId);
// make sure, the execution is finished to not influence other test methods
invokeThread.join(deadline.timeLeft().toMillis());
assertFalse("Program invoke thread still running", invokeThread.isAlive());
}
示例9
@Test
public void testCancelYarn() throws Exception {
// this only works if there is no active job at this point
assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(2);
sender.setInvokableClass(BlockingInvokable.class);
final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
final JobID jid = jobGraph.getJobID();
ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
clusterClient.setDetached(true);
clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());
// wait for job to show up
while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(10);
}
// wait for tasks to be properly running
BlockingInvokable.latch.await();
final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
final Deadline deadline = testTimeout.fromNow();
try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) {
// Request the file from the web server
client.sendGetRequest("/jobs/" + jid + "/yarn-cancel", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = client
.getNextResponse(deadline.timeLeft());
assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.getType());
assertEquals("{}", response.getContent());
}
// wait for cancellation to finish
while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(20);
}
BlockingInvokable.reset();
}
示例10
@Test
public void amqTopologyWithCheckpointing() throws Exception {
ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
AMQSinkConfig<String> sinkConfig = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
.setConnectionFactory(connectionFactory)
.setDestinationName("queue2")
.setSerializationSchema(new SimpleStringSchema())
.build();
AMQSink<String> sink = new AMQSink<>(sinkConfig);
sink.open(new Configuration());
for (int i = 0; i < MESSAGES_NUM; i++) {
sink.invoke("amq-" + i, null);
}
AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(connectionFactory)
.setDestinationName("queue2")
.setDeserializationSchema(new SimpleStringSchema())
.build();
final AMQSource<String> source = new AMQSource<>(sourceConfig);
RuntimeContext runtimeContext = createMockRuntimeContext();
source.setRuntimeContext(runtimeContext);
source.open(new Configuration());
final TestSourceContext sourceContext = new TestSourceContext();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
source.run(sourceContext);
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
while (deadline.hasTimeLeft() && sourceContext.getIdsNum() < MESSAGES_NUM) {
Thread.sleep(100);
Random random = new Random();
final long checkpointId = random.nextLong();
synchronized (sourceContext.getCheckpointLock()) {
source.snapshotState(new FunctionSnapshotContext() {
@Override
public long getCheckpointId() {
return checkpointId;
}
@Override
public long getCheckpointTimestamp() {
return System.currentTimeMillis();
}
});
source.notifyCheckpointComplete(checkpointId);
}
}
assertEquals(MESSAGES_NUM, sourceContext.getIdsNum());
}
示例11
/**
* Tests disposal of a savepoint, which contains custom user code KvState.
*/
@Test
public void testDisposeSavepointWithCustomKvState() throws Exception {
ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), miniClusterResource.getMiniCluster());
Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
File checkpointDir = FOLDER.newFolder();
File outputDir = FOLDER.newFolder();
final PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(new File(CUSTOM_KV_STATE_JAR_PATH))
.setArguments(new String[] {
String.valueOf(parallelism),
checkpointDir.toURI().toString(),
"5000",
outputDir.toURI().toString(),
"false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
})
.build();
TestStreamEnvironment.setAsContext(
miniClusterResource.getMiniCluster(),
parallelism,
Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
Collections.emptyList()
);
// Execute detached
Thread invokeThread = new Thread(() -> {
try {
program.invokeInteractiveModeForExecution();
} catch (ProgramInvocationException ex) {
if (ex.getCause() == null ||
!(ex.getCause() instanceof JobCancellationException)) {
ex.printStackTrace();
}
}
});
LOG.info("Starting program invoke thread");
invokeThread.start();
// The job ID
JobID jobId = null;
LOG.info("Waiting for job status running.");
// Wait for running job
while (jobId == null && deadline.hasTimeLeft()) {
Collection<JobStatusMessage> jobs = clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
for (JobStatusMessage job : jobs) {
if (job.getJobState() == JobStatus.RUNNING) {
jobId = job.getJobId();
LOG.info("Job running. ID: " + jobId);
break;
}
}
// Retry if job is not available yet
if (jobId == null) {
Thread.sleep(100L);
}
}
// Trigger savepoint
String savepointPath = null;
for (int i = 0; i < 20; i++) {
LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
try {
savepointPath = clusterClient.triggerSavepoint(jobId, null)
.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception cause) {
LOG.info("Failed to trigger savepoint. Retrying...", cause);
// This can fail if the operators are not opened yet
Thread.sleep(500);
}
}
assertNotNull("Failed to trigger savepoint", savepointPath);
clusterClient.disposeSavepoint(savepointPath).get();
clusterClient.cancel(jobId).get();
// make sure, the execution is finished to not influence other test methods
invokeThread.join(deadline.timeLeft().toMillis());
assertFalse("Program invoke thread still running", invokeThread.isAlive());
}
示例12
/**
* Tests that the stream operator can snapshot and restore the operator state of chained
* operators.
*/
@Test
public void testSnapshottingAndRestoring() throws Exception {
final Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();
IdentityKeySelector<String> keySelector = new IdentityKeySelector<>();
testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
long checkpointId = 1L;
long checkpointTimestamp = 1L;
int numberChainedTasks = 11;
StreamConfig streamConfig = testHarness.getStreamConfig();
configureChainedTestingStreamOperator(streamConfig, numberChainedTasks);
TestTaskStateManager taskStateManager = testHarness.taskStateManager;
OneShotLatch waitForAcknowledgeLatch = new OneShotLatch();
taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);
// reset number of restore calls
TestingStreamOperator.numberRestoreCalls = 0;
testHarness.invoke();
testHarness.waitForTaskRunning(deadline.timeLeft().toMillis());
final OneInputStreamTask<String, String> streamTask = testHarness.getTask();
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())) {}
// since no state was set, there shouldn't be restore calls
assertEquals(0, TestingStreamOperator.numberRestoreCalls);
waitForAcknowledgeLatch.await();
assertEquals(checkpointId, taskStateManager.getReportedCheckpointId());
testHarness.endInput();
testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
final OneInputStreamTaskTestHarness<String, String> restoredTaskHarness =
new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
restoredTaskHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
restoredTaskHarness.setTaskStateSnapshot(checkpointId, taskStateManager.getLastJobManagerTaskStateSnapshot());
StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig();
configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks);
TaskStateSnapshot stateHandles = taskStateManager.getLastJobManagerTaskStateSnapshot();
Assert.assertEquals(numberChainedTasks, stateHandles.getSubtaskStateMappings().size());
TestingStreamOperator.numberRestoreCalls = 0;
// transfer state to new harness
restoredTaskHarness.taskStateManager.restoreLatestCheckpointState(
taskStateManager.getJobManagerTaskStateSnapshotsByCheckpointId());
restoredTaskHarness.invoke();
restoredTaskHarness.endInput();
restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
// restore of every chained operator should have been called
assertEquals(numberChainedTasks, TestingStreamOperator.numberRestoreCalls);
TestingStreamOperator.numberRestoreCalls = 0;
TestingStreamOperator.numberSnapshotCalls = 0;
}
示例13
/**
* Tests that the stream operator can snapshot and restore the operator state of chained
* operators.
*/
@Test
public void testSnapshottingAndRestoring() throws Exception {
final Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();
IdentityKeySelector<String> keySelector = new IdentityKeySelector<>();
testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
long checkpointId = 1L;
long checkpointTimestamp = 1L;
int numberChainedTasks = 11;
StreamConfig streamConfig = testHarness.getStreamConfig();
configureChainedTestingStreamOperator(streamConfig, numberChainedTasks);
TestTaskStateManager taskStateManager = testHarness.taskStateManager;
OneShotLatch waitForAcknowledgeLatch = new OneShotLatch();
taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);
// reset number of restore calls
TestingStreamOperator.numberRestoreCalls = 0;
testHarness.invoke();
testHarness.waitForTaskRunning(deadline.timeLeft().toMillis());
final OneInputStreamTask<String, String> streamTask = testHarness.getTask();
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false)) {}
// since no state was set, there shouldn't be restore calls
assertEquals(0, TestingStreamOperator.numberRestoreCalls);
waitForAcknowledgeLatch.await();
assertEquals(checkpointId, taskStateManager.getReportedCheckpointId());
testHarness.endInput();
testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
final OneInputStreamTaskTestHarness<String, String> restoredTaskHarness =
new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
restoredTaskHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
restoredTaskHarness.setTaskStateSnapshot(checkpointId, taskStateManager.getLastJobManagerTaskStateSnapshot());
StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig();
configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks);
TaskStateSnapshot stateHandles = taskStateManager.getLastJobManagerTaskStateSnapshot();
Assert.assertEquals(numberChainedTasks, stateHandles.getSubtaskStateMappings().size());
TestingStreamOperator.numberRestoreCalls = 0;
// transfer state to new harness
restoredTaskHarness.taskStateManager.restoreLatestCheckpointState(
taskStateManager.getJobManagerTaskStateSnapshotsByCheckpointId());
restoredTaskHarness.invoke();
restoredTaskHarness.endInput();
restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
// restore of every chained operator should have been called
assertEquals(numberChainedTasks, TestingStreamOperator.numberRestoreCalls);
TestingStreamOperator.numberRestoreCalls = 0;
TestingStreamOperator.numberSnapshotCalls = 0;
}
示例14
/**
* Tests that the stream operator can snapshot and restore the operator state of chained
* operators.
*/
@Test
public void testSnapshottingAndRestoring() throws Exception {
final Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();
IdentityKeySelector<String> keySelector = new IdentityKeySelector<>();
testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
long checkpointId = 1L;
long checkpointTimestamp = 1L;
int numberChainedTasks = 11;
StreamConfig streamConfig = testHarness.getStreamConfig();
configureChainedTestingStreamOperator(streamConfig, numberChainedTasks);
TestTaskStateManager taskStateManager = testHarness.taskStateManager;
OneShotLatch waitForAcknowledgeLatch = new OneShotLatch();
taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);
// reset number of restore calls
TestingStreamOperator.numberRestoreCalls = 0;
testHarness.invoke();
testHarness.waitForTaskRunning();
final OneInputStreamTask<String, String> streamTask = testHarness.getTask();
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
streamTask.triggerCheckpointAsync(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false).get();
// since no state was set, there shouldn't be restore calls
assertEquals(0, TestingStreamOperator.numberRestoreCalls);
waitForAcknowledgeLatch.await();
assertEquals(checkpointId, taskStateManager.getReportedCheckpointId());
testHarness.endInput();
testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
final OneInputStreamTaskTestHarness<String, String> restoredTaskHarness =
new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
restoredTaskHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
restoredTaskHarness.setTaskStateSnapshot(checkpointId, taskStateManager.getLastJobManagerTaskStateSnapshot());
StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig();
configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks);
TaskStateSnapshot stateHandles = taskStateManager.getLastJobManagerTaskStateSnapshot();
Assert.assertEquals(numberChainedTasks, stateHandles.getSubtaskStateMappings().size());
TestingStreamOperator.numberRestoreCalls = 0;
// transfer state to new harness
restoredTaskHarness.taskStateManager.restoreLatestCheckpointState(
taskStateManager.getJobManagerTaskStateSnapshotsByCheckpointId());
restoredTaskHarness.invoke();
restoredTaskHarness.endInput();
restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
// restore of every chained operator should have been called
assertEquals(numberChainedTasks, TestingStreamOperator.numberRestoreCalls);
TestingStreamOperator.numberRestoreCalls = 0;
TestingStreamOperator.numberSnapshotCalls = 0;
}