Java源码示例:org.apache.flink.client.cli.util.MockedCliFrontend
示例1
@Test
public void testUnknownJobId() throws Exception {
// test unknown job Id
JobID jid = new JobID();
String[] parameters = { jid.toString() };
String expectedMessage = "Test exception";
FlinkException testException = new FlinkException(expectedMessage);
final ClusterClient<String> clusterClient = createClusterClient(testException);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
try {
testFrontend.stop(parameters);
fail("Should have failed.");
} catch (FlinkException e) {
assertTrue(ExceptionUtils.findThrowableWithMessage(e, expectedMessage).isPresent());
}
}
示例2
@Test
public void testTriggerSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
JobID jobId = new JobID();
String savepointPath = "expectedSavepointPath";
final ClusterClient<String> clusterClient = createClusterClient(savepointPath);
try {
MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
String[] parameters = { jobId.toString() };
frontend.savepoint(parameters);
verify(clusterClient, times(1))
.triggerSavepoint(eq(jobId), isNull(String.class));
assertTrue(buffer.toString().contains(savepointPath));
}
finally {
clusterClient.shutdown();
restoreStdOutAndStdErr();
}
}
示例3
@Test
public void testTriggerSavepointFailureIllegalJobID() throws Exception {
replaceStdOutAndStdErr();
try {
CliFrontend frontend = new MockedCliFrontend(new RestClusterClient<>(getConfiguration(), StandaloneClusterId.getInstance()));
String[] parameters = { "invalid job id" };
try {
frontend.savepoint(parameters);
fail("Should have failed.");
} catch (CliArgsException e) {
assertThat(e.getMessage(), Matchers.containsString("Cannot parse JobID"));
}
}
finally {
restoreStdOutAndStdErr();
}
}
示例4
@Test
public void testDisposeSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
String savepointPath = "expectedSavepointPath";
ClusterClient clusterClient = new DisposeSavepointClusterClient(
(String path) -> CompletableFuture.completedFuture(Acknowledge.get()), getConfiguration());
try {
CliFrontend frontend = new MockedCliFrontend(clusterClient);
String[] parameters = { "-d", savepointPath };
frontend.savepoint(parameters);
String outMsg = buffer.toString();
assertTrue(outMsg.contains(savepointPath));
assertTrue(outMsg.contains("disposed"));
}
finally {
clusterClient.shutdown();
restoreStdOutAndStdErr();
}
}
示例5
@Test
public void testUnknownJobId() throws Exception {
// test unknown job Id
JobID jid = new JobID();
String[] parameters = { "-p", "test-target-dir", jid.toString() };
String expectedMessage = "Test exception";
FlinkException testException = new FlinkException(expectedMessage);
final ClusterClient<String> clusterClient = createClusterClient(testException);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
try {
testFrontend.stop(parameters);
fail("Should have failed.");
} catch (FlinkException e) {
assertTrue(ExceptionUtils.findThrowableWithMessage(e, expectedMessage).isPresent());
}
}
示例6
@Test
public void testTriggerSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
JobID jobId = new JobID();
String savepointPath = "expectedSavepointPath";
final ClusterClient<String> clusterClient = createClusterClient(savepointPath);
try {
MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
String[] parameters = { jobId.toString() };
frontend.savepoint(parameters);
verify(clusterClient, times(1))
.triggerSavepoint(eq(jobId), isNull(String.class));
assertTrue(buffer.toString().contains(savepointPath));
}
finally {
clusterClient.shutdown();
restoreStdOutAndStdErr();
}
}
示例7
@Test
public void testTriggerSavepointFailureIllegalJobID() throws Exception {
replaceStdOutAndStdErr();
try {
CliFrontend frontend = new MockedCliFrontend(new RestClusterClient<>(getConfiguration(), StandaloneClusterId.getInstance()));
String[] parameters = { "invalid job id" };
try {
frontend.savepoint(parameters);
fail("Should have failed.");
} catch (CliArgsException e) {
assertThat(e.getMessage(), Matchers.containsString("Cannot parse JobID"));
}
}
finally {
restoreStdOutAndStdErr();
}
}
示例8
@Test
public void testDisposeSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
String savepointPath = "expectedSavepointPath";
ClusterClient clusterClient = new DisposeSavepointClusterClient(
(String path) -> CompletableFuture.completedFuture(Acknowledge.get()), getConfiguration());
try {
CliFrontend frontend = new MockedCliFrontend(clusterClient);
String[] parameters = { "-d", savepointPath };
frontend.savepoint(parameters);
String outMsg = buffer.toString();
assertTrue(outMsg.contains(savepointPath));
assertTrue(outMsg.contains("disposed"));
}
finally {
clusterClient.shutdown();
restoreStdOutAndStdErr();
}
}
示例9
@Test
public void testStopWithOnlyJobId() throws Exception {
// test stop properly
JobID jid = new JobID();
String jidString = jid.toString();
String[] parameters = { jidString };
OneShotLatch stopWithSavepointLatch = new OneShotLatch();
TestingClusterClient<String> clusterClient = new TestingClusterClient<>();
clusterClient.setStopWithSavepointFunction((jobID, advanceToEndOfEventTime, savepointDirectory) -> {
assertThat(jobID, is(jid));
assertThat(advanceToEndOfEventTime, is(false));
assertNull(savepointDirectory);
stopWithSavepointLatch.trigger();
return CompletableFuture.completedFuture(savepointDirectory);
});
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
stopWithSavepointLatch.await();
}
示例10
@Test
public void testStopWithDefaultSavepointDir() throws Exception {
JobID jid = new JobID();
String[] parameters = {jid.toString() };
OneShotLatch stopWithSavepointLatch = new OneShotLatch();
TestingClusterClient<String> clusterClient = new TestingClusterClient<>();
clusterClient.setStopWithSavepointFunction((jobID, advanceToEndOfEventTime, savepointDirectory) -> {
assertThat(jobID, is(jid));
assertThat(advanceToEndOfEventTime, is(false));
assertNull(savepointDirectory);
stopWithSavepointLatch.trigger();
return CompletableFuture.completedFuture(savepointDirectory);
});
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
stopWithSavepointLatch.await();
}
示例11
@Test
public void testStopWithExplicitSavepointDir() throws Exception {
JobID jid = new JobID();
String[] parameters = { "-p", "test-target-dir", jid.toString() };
OneShotLatch stopWithSavepointLatch = new OneShotLatch();
TestingClusterClient<String> clusterClient = new TestingClusterClient<>();
clusterClient.setStopWithSavepointFunction((jobID, advanceToEndOfEventTime, savepointDirectory) -> {
assertThat(jobID, is(jid));
assertThat(advanceToEndOfEventTime, is(false));
assertThat(savepointDirectory, is("test-target-dir"));
stopWithSavepointLatch.trigger();
return CompletableFuture.completedFuture(savepointDirectory);
});
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
stopWithSavepointLatch.await();
}
示例12
@Test
public void testStopOnlyWithMaxWM() throws Exception {
JobID jid = new JobID();
String[] parameters = { "-d", jid.toString() };
OneShotLatch stopWithSavepointLatch = new OneShotLatch();
TestingClusterClient<String> clusterClient = new TestingClusterClient<>();
clusterClient.setStopWithSavepointFunction((jobID, advanceToEndOfEventTime, savepointDirectory) -> {
assertThat(jobID, is(jid));
assertThat(advanceToEndOfEventTime, is(true));
assertNull(savepointDirectory);
stopWithSavepointLatch.trigger();
return CompletableFuture.completedFuture(savepointDirectory);
});
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
stopWithSavepointLatch.await();
}
示例13
@Test
public void testStopWithMaxWMAndDefaultSavepointDir() throws Exception {
JobID jid = new JobID();
String[] parameters = { "-p", "-d", jid.toString() };
OneShotLatch stopWithSavepointLatch = new OneShotLatch();
TestingClusterClient<String> clusterClient = new TestingClusterClient<>();
clusterClient.setStopWithSavepointFunction((jobID, advanceToEndOfEventTime, savepointDirectory) -> {
assertThat(jobID, is(jid));
assertThat(advanceToEndOfEventTime, is(true));
assertNull(savepointDirectory);
stopWithSavepointLatch.trigger();
return CompletableFuture.completedFuture(savepointDirectory);
});
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
stopWithSavepointLatch.await();
}
示例14
@Test
public void testStopWithMaxWMAndExplicitSavepointDir() throws Exception {
JobID jid = new JobID();
String[] parameters = { "-d", "-p", "test-target-dir", jid.toString() };
OneShotLatch stopWithSavepointLatch = new OneShotLatch();
TestingClusterClient<String> clusterClient = new TestingClusterClient<>();
clusterClient.setStopWithSavepointFunction((jobID, advanceToEndOfEventTime, savepointDirectory) -> {
assertThat(jobID, is(jid));
assertThat(advanceToEndOfEventTime, is(true));
assertThat(savepointDirectory, is("test-target-dir"));
stopWithSavepointLatch.trigger();
return CompletableFuture.completedFuture(savepointDirectory);
});
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
stopWithSavepointLatch.await();
}
示例15
@Test
public void testUnknownJobId() throws Exception {
// test unknown job Id
JobID jid = new JobID();
String[] parameters = { "-p", "test-target-dir", jid.toString() };
String expectedMessage = "Test exception";
FlinkException testException = new FlinkException(expectedMessage);
TestingClusterClient<String> clusterClient = new TestingClusterClient<>();
clusterClient.setStopWithSavepointFunction((jobID, advanceToEndOfEventTime, savepointDirectory) -> FutureUtils.completedExceptionally(testException));
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
try {
testFrontend.stop(parameters);
fail("Should have failed.");
} catch (FlinkException e) {
assertTrue(ExceptionUtils.findThrowableWithMessage(e, expectedMessage).isPresent());
}
}
示例16
@Test
public void testTriggerSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
JobID jobId = new JobID();
String savepointPath = "expectedSavepointPath";
final ClusterClient<String> clusterClient = createClusterClient(savepointPath);
try {
MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
String[] parameters = { jobId.toString() };
frontend.savepoint(parameters);
verify(clusterClient, times(1))
.triggerSavepoint(eq(jobId), isNull(String.class));
assertTrue(buffer.toString().contains(savepointPath));
}
finally {
clusterClient.close();
restoreStdOutAndStdErr();
}
}
示例17
@Test
public void testTriggerSavepointFailureIllegalJobID() throws Exception {
replaceStdOutAndStdErr();
try {
CliFrontend frontend = new MockedCliFrontend(new RestClusterClient<>(getConfiguration(), StandaloneClusterId.getInstance()));
String[] parameters = { "invalid job id" };
try {
frontend.savepoint(parameters);
fail("Should have failed.");
} catch (CliArgsException e) {
assertThat(e.getMessage(), Matchers.containsString("Cannot parse JobID"));
}
}
finally {
restoreStdOutAndStdErr();
}
}
示例18
@Test
public void testDisposeSavepointSuccess() throws Exception {
replaceStdOutAndStdErr();
String savepointPath = "expectedSavepointPath";
ClusterClient clusterClient = new DisposeSavepointClusterClient(
(String path) -> CompletableFuture.completedFuture(Acknowledge.get()), getConfiguration());
try {
CliFrontend frontend = new MockedCliFrontend(clusterClient);
String[] parameters = { "-d", savepointPath };
frontend.savepoint(parameters);
String outMsg = buffer.toString();
assertTrue(outMsg.contains(savepointPath));
assertTrue(outMsg.contains("disposed"));
}
finally {
clusterClient.close();
restoreStdOutAndStdErr();
}
}
示例19
@Test
public void testCancel() throws Exception {
// test cancel properly
JobID jid = new JobID();
OneShotLatch cancelLatch = new OneShotLatch();
String[] parameters = { jid.toString() };
TestingClusterClient<String> clusterClient = new TestingClusterClient<>();
clusterClient.setCancelFunction(jobID -> {
cancelLatch.trigger();
return CompletableFuture.completedFuture(Acknowledge.get());
});
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.cancel(parameters);
cancelLatch.await();
}
示例20
@Test
public void testStop() throws Exception {
// test stop properly
JobID jid = new JobID();
String jidString = jid.toString();
String[] parameters = { jidString };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
Mockito.verify(clusterClient, times(1)).stop(any(JobID.class));
}
示例21
@Test
public void testList() throws Exception {
// test list properly
{
String[] parameters = {"-r", "-s", "-a"};
ClusterClient<String> clusterClient = createClusterClient();
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.list(parameters);
Mockito.verify(clusterClient, times(1))
.listJobs();
}
}
示例22
private Tuple2<JobID, Integer> callModify(String[] args) throws Exception {
final CompletableFuture<Tuple2<JobID, Integer>> rescaleJobFuture = new CompletableFuture<>();
final TestingClusterClient clusterClient = new TestingClusterClient(rescaleJobFuture, getConfiguration());
final MockedCliFrontend cliFrontend = new MockedCliFrontend(clusterClient);
cliFrontend.modify(args);
assertThat(rescaleJobFuture.isDone(), Matchers.is(true));
return rescaleJobFuture.get();
}
示例23
@Test
public void testTriggerSavepointFailure() throws Exception {
replaceStdOutAndStdErr();
JobID jobId = new JobID();
String expectedTestException = "expectedTestException";
Exception testException = new Exception(expectedTestException);
final ClusterClient<String> clusterClient = createFailingClusterClient(testException);
try {
MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
String[] parameters = { jobId.toString() };
try {
frontend.savepoint(parameters);
fail("Savepoint should have failed.");
} catch (FlinkException e) {
assertTrue(ExceptionUtils.findThrowableWithMessage(e, expectedTestException).isPresent());
}
}
finally {
clusterClient.shutdown();
restoreStdOutAndStdErr();
}
}
示例24
/**
* Tests that a CLI call with a custom savepoint directory target is
* forwarded correctly to the cluster client.
*/
@Test
public void testTriggerSavepointCustomTarget() throws Exception {
replaceStdOutAndStdErr();
JobID jobId = new JobID();
String savepointDirectory = "customTargetDirectory";
final ClusterClient<String> clusterClient = createClusterClient(savepointDirectory);
try {
MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
String[] parameters = { jobId.toString(), savepointDirectory };
frontend.savepoint(parameters);
verify(clusterClient, times(1))
.triggerSavepoint(eq(jobId), eq(savepointDirectory));
assertTrue(buffer.toString().contains(savepointDirectory));
}
finally {
clusterClient.shutdown();
restoreStdOutAndStdErr();
}
}
示例25
/**
* Tests disposal with a JAR file.
*/
@Test
public void testDisposeWithJar() throws Exception {
replaceStdOutAndStdErr();
final CompletableFuture<String> disposeSavepointFuture = new CompletableFuture<>();
final DisposeSavepointClusterClient clusterClient = new DisposeSavepointClusterClient(
(String savepointPath) -> {
disposeSavepointFuture.complete(savepointPath);
return CompletableFuture.completedFuture(Acknowledge.get());
}, getConfiguration());
try {
CliFrontend frontend = new MockedCliFrontend(clusterClient);
// Fake JAR file
File f = tmp.newFile();
ZipOutputStream out = new ZipOutputStream(new FileOutputStream(f));
out.close();
final String disposePath = "any-path";
String[] parameters = { "-d", disposePath, "-j", f.getAbsolutePath() };
frontend.savepoint(parameters);
final String actualSavepointPath = disposeSavepointFuture.get();
assertEquals(disposePath, actualSavepointPath);
} finally {
clusterClient.shutdown();
restoreStdOutAndStdErr();
}
}
示例26
@Test
public void testDisposeSavepointFailure() throws Exception {
replaceStdOutAndStdErr();
String savepointPath = "expectedSavepointPath";
Exception testException = new Exception("expectedTestException");
DisposeSavepointClusterClient clusterClient = new DisposeSavepointClusterClient((String path) -> FutureUtils.completedExceptionally(testException), getConfiguration());
try {
CliFrontend frontend = new MockedCliFrontend(clusterClient);
String[] parameters = { "-d", savepointPath };
try {
frontend.savepoint(parameters);
fail("Savepoint should have failed.");
} catch (Exception e) {
assertTrue(ExceptionUtils.findThrowableWithMessage(e, testException.getMessage()).isPresent());
}
}
finally {
clusterClient.shutdown();
restoreStdOutAndStdErr();
}
}
示例27
@Test
public void testCancel() throws Exception {
// test cancel properly
JobID jid = new JobID();
String[] parameters = { jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient();
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.cancel(parameters);
Mockito.verify(clusterClient, times(1)).cancel(any(JobID.class));
}
示例28
@Test
public void testStopWithOnlyJobId() throws Exception {
// test stop properly
JobID jid = new JobID();
String jidString = jid.toString();
String[] parameters = { jidString };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
Mockito.verify(clusterClient, times(1))
.stopWithSavepoint(eq(jid), eq(false), isNull());
}
示例29
@Test
public void testStopWithDefaultSavepointDir() throws Exception {
JobID jid = new JobID();
String[] parameters = {jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
Mockito.verify(clusterClient, times(1))
.stopWithSavepoint(eq(jid), eq(false), isNull());
}
示例30
@Test
public void testStopWithExplicitSavepointDir() throws Exception {
JobID jid = new JobID();
String[] parameters = { "-p", "test-target-dir", jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
Mockito.verify(clusterClient, times(1))
.stopWithSavepoint(eq(jid), eq(false), eq("test-target-dir"));
}