Java源码示例:org.apache.flink.runtime.testutils.MiniClusterResource
示例1
@BeforeClass
public static void setup() throws Exception {
LOG.info("In RollingSinkITCase: Starting MiniDFSCluster ");
dataDir = tempFolder.newFolder();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
hdfsCluster = builder.build();
dfs = hdfsCluster.getFileSystem();
hdfsURI = "hdfs://"
+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ "/";
miniClusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());
miniClusterResource.before();
}
示例2
@BeforeClass
public static void setUp() throws Exception {
Configuration config = new Configuration();
// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
// Savepoint path
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
// required as we otherwise run out of memory
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
miniClusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(2)
.setNumberSlotsPerTaskManager(2)
.setConfiguration(config)
.build());
miniClusterResource.before();
}
示例3
@BeforeClass
public static void setup() throws Exception {
blobBaseDir = TEMPORARY_FOLDER.newFolder();
Configuration cfg = new Configuration();
cfg.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
cfg.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
cfg.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
// BLOBs are deleted from BlobCache between 1s and 2s after last reference
// -> the BlobCache may still have the BLOB or not (let's test both cases randomly)
cfg.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
configuration = new UnmodifiableConfiguration(cfg);
miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.setConfiguration(configuration)
.build());
miniClusterResource.before();
}
示例4
@BeforeClass
public static void setUp() throws Exception {
Configuration config = new Configuration();
// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
// Savepoint path
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
// required as we otherwise run out of memory
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
miniClusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(2)
.setNumberSlotsPerTaskManager(2)
.setConfiguration(config)
.build());
miniClusterResource.before();
}
示例5
@BeforeClass
public static void setup() throws Exception {
blobBaseDir = TEMPORARY_FOLDER.newFolder();
Configuration cfg = new Configuration();
cfg.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
cfg.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
cfg.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
// BLOBs are deleted from BlobCache between 1s and 2s after last reference
// -> the BlobCache may still have the BLOB or not (let's test both cases randomly)
cfg.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
configuration = new UnmodifiableConfiguration(cfg);
miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.setConfiguration(configuration)
.build());
miniClusterResource.before();
}
示例6
@BeforeClass
public static void setUp() throws Exception {
Configuration config = new Configuration();
// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
// Savepoint path
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
// required as we otherwise run out of memory
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("80m"));
miniClusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(2)
.setNumberSlotsPerTaskManager(2)
.setConfiguration(config)
.build());
miniClusterResource.before();
}
示例7
@BeforeClass
public static void setup() throws Exception {
blobBaseDir = TEMPORARY_FOLDER.newFolder();
Configuration cfg = new Configuration();
cfg.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
cfg.setString(RestartStrategyOptions.RESTART_STRATEGY, "fixeddelay");
cfg.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
// BLOBs are deleted from BlobCache between 1s and 2s after last reference
// -> the BlobCache may still have the BLOB or not (let's test both cases randomly)
cfg.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
configuration = new UnmodifiableConfiguration(cfg);
miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.setConfiguration(configuration)
.build());
miniClusterResource.before();
}
示例8
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
LOG.info("Starting exactly once test");
final String streamName = "flink-test-" + UUID.randomUUID().toString();
final String accessKey = pt.getRequired("accessKey");
final String secretKey = pt.getRequired("secretKey");
final String region = pt.getRequired("region");
Properties configProps = new Properties();
configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
configProps.setProperty(AWSConfigConstants.AWS_REGION, region);
AmazonKinesis client = AWSUtil.createKinesisClient(configProps);
// create a stream for the test:
client.createStream(streamName, 1);
// wait until stream has been created
DescribeStreamResult status = client.describeStream(streamName);
LOG.info("status {}" , status);
while (!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
status = client.describeStream(streamName);
LOG.info("Status of stream {}", status);
Thread.sleep(1000);
}
final Configuration flinkConfig = new Configuration();
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
MiniClusterResource flink = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(8)
.setConfiguration(flinkConfig)
.build());
flink.before();
final int flinkPort = flink.getRestAddres().getPort();
try {
final AtomicReference<Throwable> producerError = new AtomicReference<>();
Thread producerThread = KinesisEventsGeneratorProducerThread.create(
TOTAL_EVENT_COUNT, 2,
accessKey, secretKey, region, streamName,
producerError, flinkPort, flinkConfig);
producerThread.start();
final AtomicReference<Throwable> consumerError = new AtomicReference<>();
Thread consumerThread = ExactlyOnceValidatingConsumerThread.create(
TOTAL_EVENT_COUNT, 200, 2, 500, 500,
accessKey, secretKey, region, streamName,
consumerError, flinkPort, flinkConfig);
consumerThread.start();
boolean deadlinePassed = false;
long deadline = System.currentTimeMillis() + (1000 * 2 * 60); // wait at most for two minutes
// wait until both producer and consumer finishes, or an unexpected error is thrown
while ((consumerThread.isAlive() || producerThread.isAlive()) &&
(producerError.get() == null && consumerError.get() == null)) {
Thread.sleep(1000);
if (System.currentTimeMillis() >= deadline) {
LOG.warn("Deadline passed");
deadlinePassed = true;
break; // enough waiting
}
}
if (producerThread.isAlive()) {
producerThread.interrupt();
}
if (consumerThread.isAlive()) {
consumerThread.interrupt();
}
if (producerError.get() != null) {
LOG.info("+++ TEST failed! +++");
throw new RuntimeException("Producer failed", producerError.get());
}
if (consumerError.get() != null) {
LOG.info("+++ TEST failed! +++");
throw new RuntimeException("Consumer failed", consumerError.get());
}
if (!deadlinePassed) {
LOG.info("+++ TEST passed! +++");
} else {
LOG.info("+++ TEST failed! +++");
}
} finally {
client.deleteStream(streamName);
client.shutdown();
// stopping flink
flink.after();
}
}
示例9
@BeforeClass
public static void setup() throws Exception {
skipIfHadoopVersionIsNotAppropriate();
LOG.info("starting secure cluster environment for testing");
dataDir = tempFolder.newFolder();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
SecureTestEnvironment.prepare(tempFolder);
populateSecureConfigurations();
Configuration flinkConfig = new Configuration();
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
SecureTestEnvironment.getTestKeytab());
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
SecureTestEnvironment.getHadoopServicePrincipal());
SecurityConfiguration ctx =
new SecurityConfiguration(
flinkConfig,
Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, conf)));
try {
TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
} catch (Exception e) {
throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e);
}
File hdfsSiteXML = new File(dataDir.getAbsolutePath() + "/hdfs-site.xml");
FileWriter writer = new FileWriter(hdfsSiteXML);
conf.writeXml(writer);
writer.flush();
writer.close();
Map<String, String> map = new HashMap<String, String>(System.getenv());
map.put("HADOOP_CONF_DIR", hdfsSiteXML.getParentFile().getAbsolutePath());
TestBaseUtils.setEnv(map);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.checkDataNodeAddrConfig(true);
builder.checkDataNodeHostConfig(true);
hdfsCluster = builder.build();
dfs = hdfsCluster.getFileSystem();
hdfsURI = "hdfs://"
+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ "/";
Configuration configuration = startSecureFlinkClusterWithRecoveryModeEnabled();
miniClusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(configuration)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());
miniClusterResource.before();
}
示例10
@Test
public void testRunJar() throws Exception {
Path uploadDir = TMP.newFolder().toPath();
Path actualUploadDir = uploadDir.resolve("flink-web-upload");
Files.createDirectory(actualUploadDir);
Path emptyJar = actualUploadDir.resolve("empty.jar");
Files.createFile(emptyJar);
Configuration config = new Configuration();
config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString());
MiniClusterResource clusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build());
clusterResource.before();
try {
Configuration clientConfig = clusterResource.getClientConfiguration();
RestClient client = new RestClient(RestClientConfiguration.fromConfiguration(clientConfig), TestingUtils.defaultExecutor());
try {
JarRunHeaders headers = JarRunHeaders.getInstance();
JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
parameters.jarIdPathParameter.resolve(emptyJar.getFileName().toString());
String host = clientConfig.getString(RestOptions.ADDRESS);
int port = clientConfig.getInteger(RestOptions.PORT);
try {
client.sendRequest(host, port, headers, parameters, new JarRunRequestBody())
.get();
} catch (Exception e) {
Optional<RestClientException> expected = ExceptionUtils.findThrowable(e, RestClientException.class);
if (expected.isPresent()) {
// implies the job was actually submitted
assertTrue(expected.get().getMessage().contains("ProgramInvocationException"));
// original cause is preserved in stack trace
assertThat(expected.get().getMessage(), containsString("ZipException"));
// implies the jar was registered for the job graph (otherwise the jar name would not occur in the exception)
// implies the jar was uploaded (otherwise the file would not be found at all)
assertTrue(expected.get().getMessage().contains("empty.jar'. zip file is empty"));
} else {
throw e;
}
}
} finally {
client.shutdown(Time.milliseconds(10));
}
} finally {
clusterResource.after();
}
}
示例11
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
LOG.info("Starting exactly once test");
final String streamName = "flink-test-" + UUID.randomUUID().toString();
final String accessKey = pt.getRequired("accessKey");
final String secretKey = pt.getRequired("secretKey");
final String region = pt.getRequired("region");
Properties configProps = new Properties();
configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
configProps.setProperty(AWSConfigConstants.AWS_REGION, region);
AmazonKinesis client = AWSUtil.createKinesisClient(configProps);
// create a stream for the test:
client.createStream(streamName, 1);
// wait until stream has been created
DescribeStreamResult status = client.describeStream(streamName);
LOG.info("status {}" , status);
while (!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
status = client.describeStream(streamName);
LOG.info("Status of stream {}", status);
Thread.sleep(1000);
}
final Configuration flinkConfig = new Configuration();
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
MiniClusterResource flink = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(8)
.setConfiguration(flinkConfig)
.build());
flink.before();
final int flinkPort = flink.getRestAddres().getPort();
try {
final AtomicReference<Throwable> producerError = new AtomicReference<>();
Thread producerThread = KinesisEventsGeneratorProducerThread.create(
TOTAL_EVENT_COUNT, 2,
accessKey, secretKey, region, streamName,
producerError, flinkPort, flinkConfig);
producerThread.start();
final AtomicReference<Throwable> consumerError = new AtomicReference<>();
Thread consumerThread = ExactlyOnceValidatingConsumerThread.create(
TOTAL_EVENT_COUNT, 200, 2, 500, 500,
accessKey, secretKey, region, streamName,
consumerError, flinkPort, flinkConfig);
consumerThread.start();
boolean deadlinePassed = false;
long deadline = System.currentTimeMillis() + (1000 * 2 * 60); // wait at most for two minutes
// wait until both producer and consumer finishes, or an unexpected error is thrown
while ((consumerThread.isAlive() || producerThread.isAlive()) &&
(producerError.get() == null && consumerError.get() == null)) {
Thread.sleep(1000);
if (System.currentTimeMillis() >= deadline) {
LOG.warn("Deadline passed");
deadlinePassed = true;
break; // enough waiting
}
}
if (producerThread.isAlive()) {
producerThread.interrupt();
}
if (consumerThread.isAlive()) {
consumerThread.interrupt();
}
if (producerError.get() != null) {
LOG.info("+++ TEST failed! +++");
throw new RuntimeException("Producer failed", producerError.get());
}
if (consumerError.get() != null) {
LOG.info("+++ TEST failed! +++");
throw new RuntimeException("Consumer failed", consumerError.get());
}
if (!deadlinePassed) {
LOG.info("+++ TEST passed! +++");
} else {
LOG.info("+++ TEST failed! +++");
}
} finally {
client.deleteStream(streamName);
client.shutdown();
// stopping flink
flink.after();
}
}
示例12
@Test
public void testRunJar() throws Exception {
Path uploadDir = TMP.newFolder().toPath();
Path actualUploadDir = uploadDir.resolve("flink-web-upload");
Files.createDirectory(actualUploadDir);
Path emptyJar = actualUploadDir.resolve("empty.jar");
Files.createFile(emptyJar);
Configuration config = new Configuration();
config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString());
MiniClusterResource clusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build());
clusterResource.before();
try {
Configuration clientConfig = clusterResource.getClientConfiguration();
RestClient client = new RestClient(RestClientConfiguration.fromConfiguration(clientConfig), TestingUtils.defaultExecutor());
try {
JarRunHeaders headers = JarRunHeaders.getInstance();
JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
parameters.jarIdPathParameter.resolve(emptyJar.getFileName().toString());
String host = clientConfig.getString(RestOptions.ADDRESS);
int port = clientConfig.getInteger(RestOptions.PORT);
try {
client.sendRequest(host, port, headers, parameters, new JarRunRequestBody())
.get();
} catch (Exception e) {
Optional<RestClientException> expected = ExceptionUtils.findThrowable(e, RestClientException.class);
if (expected.isPresent()) {
// implies the job was actually submitted
assertTrue(expected.get().getMessage().contains("ProgramInvocationException"));
// original cause is preserved in stack trace
assertThat(expected.get().getMessage(), containsString("ZipException: zip file is empty"));
// implies the jar was registered for the job graph (otherwise the jar name would not occur in the exception)
// implies the jar was uploaded (otherwise the file would not be found at all)
assertTrue(expected.get().getMessage().contains("empty.jar"));
} else {
throw e;
}
}
} finally {
client.shutdown(Time.milliseconds(10));
}
} finally {
clusterResource.after();
}
}
示例13
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
LOG.info("Starting exactly once test");
final String streamName = "flink-test-" + UUID.randomUUID().toString();
final String accessKey = pt.getRequired("accessKey");
final String secretKey = pt.getRequired("secretKey");
final String region = pt.getRequired("region");
Properties configProps = new Properties();
configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
configProps.setProperty(AWSConfigConstants.AWS_REGION, region);
AmazonKinesis client = AWSUtil.createKinesisClient(configProps);
// create a stream for the test:
client.createStream(streamName, 1);
// wait until stream has been created
DescribeStreamResult status = client.describeStream(streamName);
LOG.info("status {}" , status);
while (!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
status = client.describeStream(streamName);
LOG.info("Status of stream {}", status);
Thread.sleep(1000);
}
final Configuration flinkConfig = new Configuration();
flinkConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("16m"));
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
MiniClusterResource flink = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(8)
.setConfiguration(flinkConfig)
.build());
flink.before();
final int flinkPort = flink.getRestAddres().getPort();
try {
final AtomicReference<Throwable> producerError = new AtomicReference<>();
Thread producerThread = KinesisEventsGeneratorProducerThread.create(
TOTAL_EVENT_COUNT, 2,
accessKey, secretKey, region, streamName,
producerError, flinkPort, flinkConfig);
producerThread.start();
final AtomicReference<Throwable> consumerError = new AtomicReference<>();
Thread consumerThread = ExactlyOnceValidatingConsumerThread.create(
TOTAL_EVENT_COUNT, 200, 2, 500, 500,
accessKey, secretKey, region, streamName,
consumerError, flinkPort, flinkConfig);
consumerThread.start();
boolean deadlinePassed = false;
long deadline = System.currentTimeMillis() + (1000 * 2 * 60); // wait at most for two minutes
// wait until both producer and consumer finishes, or an unexpected error is thrown
while ((consumerThread.isAlive() || producerThread.isAlive()) &&
(producerError.get() == null && consumerError.get() == null)) {
Thread.sleep(1000);
if (System.currentTimeMillis() >= deadline) {
LOG.warn("Deadline passed");
deadlinePassed = true;
break; // enough waiting
}
}
if (producerThread.isAlive()) {
producerThread.interrupt();
}
if (consumerThread.isAlive()) {
consumerThread.interrupt();
}
if (producerError.get() != null) {
LOG.info("+++ TEST failed! +++");
throw new RuntimeException("Producer failed", producerError.get());
}
if (consumerError.get() != null) {
LOG.info("+++ TEST failed! +++");
throw new RuntimeException("Consumer failed", consumerError.get());
}
if (!deadlinePassed) {
LOG.info("+++ TEST passed! +++");
} else {
LOG.info("+++ TEST failed! +++");
}
} finally {
client.deleteStream(streamName);
client.shutdown();
// stopping flink
flink.after();
}
}