Java源码示例:org.apache.flink.runtime.clusterframework.ContainerSpecification
示例1
/**
* Generate a container specification as a TaskManager template.
*
* <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
* needs (such as JAR file, config file, ...) and all environment variables into a container specification.
* The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
* A lightweight HTTP server serves the artifacts to the fetcher.
*/
public static void applyOverlays(
Configuration configuration, ContainerSpecification containerSpec) throws IOException {
// create the overlays that will produce the specification
CompositeContainerOverlay overlay = new CompositeContainerOverlay(
FlinkDistributionOverlay.newBuilder().fromEnvironment(configuration).build(),
HadoopConfOverlay.newBuilder().fromEnvironment(configuration).build(),
HadoopUserOverlay.newBuilder().fromEnvironment(configuration).build(),
KeytabOverlay.newBuilder().fromEnvironment(configuration).build(),
Krb5ConfOverlay.newBuilder().fromEnvironment(configuration).build(),
SSLStoreOverlay.newBuilder().fromEnvironment(configuration).build()
);
// apply the overlays
overlay.configure(containerSpec);
}
示例2
/**
* Construct a Mesos URI.
*/
public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
checkNotNull(resolver);
checkNotNull(artifact);
Option<URL> url = resolver.resolve(artifact.dest);
if (url.isEmpty()) {
throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
}
return Protos.CommandInfo.URI.newBuilder()
.setValue(url.get().toExternalForm())
.setOutputFile(artifact.dest.toString())
.setExtract(artifact.extract)
.setCache(artifact.cachable)
.setExecutable(artifact.executable)
.build();
}
示例3
@Test
public void testConfigure() throws Exception {
File confDir = tempFolder.newFolder();
initConfDir(confDir);
HadoopConfOverlay overlay = new HadoopConfOverlay(confDir);
ContainerSpecification spec = new ContainerSpecification();
overlay.configure(spec);
assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR"));
assertEquals(TARGET_CONF_DIR.getPath(), spec.getFlinkConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null));
checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml"));
checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml"));
}
示例4
@Override
public void configure(ContainerSpecification container) throws IOException {
if(keystore != null) {
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
.setSource(keystore)
.setDest(TARGET_KEYSTORE_PATH)
.setCachable(false)
.build());
container.getDynamicConfiguration().setString(SecurityOptions.SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath());
}
if(truststore != null) {
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
.setSource(truststore)
.setDest(TARGET_TRUSTSTORE_PATH)
.setCachable(false)
.build());
container.getDynamicConfiguration().setString(SecurityOptions.SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath());
}
}
示例5
@Test
public void testConfigure() throws Exception {
File keystore = tempFolder.newFile();
File truststore = tempFolder.newFile();
SSLStoreOverlay overlay = new SSLStoreOverlay(keystore, truststore);
ContainerSpecification spec = new ContainerSpecification();
overlay.configure(spec);
assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.SSL_KEYSTORE));
checkArtifact(spec, TARGET_KEYSTORE_PATH);
assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.SSL_TRUSTSTORE));
checkArtifact(spec, TARGET_TRUSTSTORE_PATH);
}
示例6
@Test
public void testConfigure() throws Exception {
File confDir = tempFolder.newFolder();
initConfDir(confDir);
HadoopConfOverlay overlay = new HadoopConfOverlay(confDir);
ContainerSpecification spec = new ContainerSpecification();
overlay.configure(spec);
assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR"));
assertEquals(TARGET_CONF_DIR.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null));
checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml"));
checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml"));
}
示例7
/**
* Generate a container specification as a TaskManager template.
*
* <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
* needs (such as JAR file, config file, ...) and all environment variables into a container specification.
* The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
* A lightweight HTTP server serves the artifacts to the fetcher.
*/
public static void applyOverlays(
Configuration configuration, ContainerSpecification containerSpec) throws IOException {
// create the overlays that will produce the specification
CompositeContainerOverlay overlay = new CompositeContainerOverlay(
FlinkDistributionOverlay.newBuilder().fromEnvironment(configuration).build(),
HadoopConfOverlay.newBuilder().fromEnvironment(configuration).build(),
HadoopUserOverlay.newBuilder().fromEnvironment(configuration).build(),
KeytabOverlay.newBuilder().fromEnvironment(configuration).build(),
Krb5ConfOverlay.newBuilder().fromEnvironment(configuration).build(),
SSLStoreOverlay.newBuilder().fromEnvironment(configuration).build()
);
// apply the overlays
overlay.configure(containerSpec);
}
示例8
/**
* Construct a Mesos URI.
*/
public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
checkNotNull(resolver);
checkNotNull(artifact);
Option<URL> url = resolver.resolve(artifact.dest);
if (url.isEmpty()) {
throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
}
return Protos.CommandInfo.URI.newBuilder()
.setValue(url.get().toExternalForm())
.setOutputFile(artifact.dest.toString())
.setExtract(artifact.extract)
.setCache(artifact.cachable)
.setExecutable(artifact.executable)
.build();
}
示例9
@Override
public void configure(ContainerSpecification container) throws IOException {
container.getEnvironmentVariables().put(ENV_FLINK_HOME_DIR, TARGET_ROOT.toString());
// add the paths to the container specification.
addPathRecursively(flinkBinPath, TARGET_ROOT, container);
addPathRecursively(flinkConfPath, TARGET_ROOT, container);
addPathRecursively(flinkLibPath, TARGET_ROOT, container);
if (flinkPluginsPath.isDirectory()) {
addPathRecursively(flinkPluginsPath, TARGET_ROOT, container);
}
else {
LOG.warn("The plugins directory '" + flinkPluginsPath + "' doesn't exist.");
}
}
示例10
@Test
public void testConfigure() throws Exception {
final File userLibFolder = tempFolder.newFolder(DEFAULT_FLINK_USR_LIB_DIR);
final Path[] files = createPaths(
tempFolder.getRoot(),
"usrlib/job_a.jar",
"usrlib/lib/dep1.jar",
"usrlib/lib/dep2.jar");
final ContainerSpecification containerSpecification = new ContainerSpecification();
final UserLibOverlay overlay = UserLibOverlay.newBuilder().setUsrLibDirectory(userLibFolder).build();
overlay.configure(containerSpecification);
for (Path file : files) {
checkArtifact(containerSpecification, new Path(FlinkDistributionOverlay.TARGET_ROOT, file.toString()));
}
}
示例11
@Override
public void configure(ContainerSpecification container) throws IOException {
if(keystore != null) {
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
.setSource(keystore)
.setDest(TARGET_KEYSTORE_PATH)
.setCachable(false)
.build());
container.getDynamicConfiguration().setString(SecurityOptions.SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath());
}
if(truststore != null) {
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
.setSource(truststore)
.setDest(TARGET_TRUSTSTORE_PATH)
.setCachable(false)
.build());
container.getDynamicConfiguration().setString(SecurityOptions.SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath());
}
}
示例12
private void testConfigure(
File binFolder,
File libFolder,
File pluginsFolder,
File confFolder,
Path[] files) throws IOException {
ContainerSpecification containerSpecification = new ContainerSpecification();
FlinkDistributionOverlay overlay = new FlinkDistributionOverlay(
binFolder,
confFolder,
libFolder,
pluginsFolder);
overlay.configure(containerSpecification);
for(Path file : files) {
checkArtifact(containerSpecification, new Path(TARGET_ROOT, file.toString()));
}
}
示例13
@Test
public void testConfigure() throws Exception {
File confDir = tempFolder.newFolder();
initConfDir(confDir);
HadoopConfOverlay overlay = new HadoopConfOverlay(confDir);
ContainerSpecification spec = new ContainerSpecification();
overlay.configure(spec);
assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR"));
assertEquals(TARGET_CONF_DIR.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null));
checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml"));
checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml"));
}
示例14
@Test
public void testConfigure() throws Exception {
File keystore = tempFolder.newFile();
File truststore = tempFolder.newFile();
SSLStoreOverlay overlay = new SSLStoreOverlay(keystore, truststore);
ContainerSpecification spec = new ContainerSpecification();
overlay.configure(spec);
assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getFlinkConfiguration().getString(SecurityOptions.SSL_KEYSTORE));
checkArtifact(spec, TARGET_KEYSTORE_PATH);
assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getFlinkConfiguration().getString(SecurityOptions.SSL_TRUSTSTORE));
checkArtifact(spec, TARGET_TRUSTSTORE_PATH);
}
示例15
/**
* Generate a container specification as a TaskManager template.
*
* <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
* needs (such as JAR file, config file, ...) and all environment variables into a container specification.
* The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
* A lightweight HTTP server serves the artifacts to the fetcher.
*/
public static void applyOverlays(
Configuration configuration, ContainerSpecification containerSpec) throws IOException {
// create the overlays that will produce the specification
CompositeContainerOverlay overlay = new CompositeContainerOverlay(
FlinkDistributionOverlay.newBuilder().fromEnvironment(configuration).build(),
UserLibOverlay.newBuilder().setUsrLibDirectory(ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null)).build(),
HadoopConfOverlay.newBuilder().fromEnvironment(configuration).build(),
HadoopUserOverlay.newBuilder().fromEnvironment(configuration).build(),
KeytabOverlay.newBuilder().fromEnvironment(configuration).build(),
Krb5ConfOverlay.newBuilder().fromEnvironment(configuration).build(),
SSLStoreOverlay.newBuilder().fromEnvironment(configuration).build()
);
// apply the overlays
overlay.configure(containerSpec);
}
示例16
/**
* Construct a Mesos URI.
*/
public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
checkNotNull(resolver);
checkNotNull(artifact);
Option<URL> url = resolver.resolve(artifact.dest);
if (url.isEmpty()) {
throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
}
return Protos.CommandInfo.URI.newBuilder()
.setValue(url.get().toExternalForm())
.setOutputFile(artifact.dest.toString())
.setExtract(artifact.extract)
.setCache(artifact.cachable)
.setExecutable(artifact.executable)
.build();
}
示例17
@Test
public void launch_withNonDefaultConfiguration_forwardsConfigurationValues() {
final Configuration configuration = new Configuration();
configuration.setString(MesosOptions.MASTER_URL, "foobar");
final MemorySize memorySize = new MemorySize(1337L);
configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, memorySize);
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
final LaunchableTask launchableTask = new LaunchableMesosWorker(
ignored -> Option.empty(),
MesosTaskManagerParameters.create(configuration),
ContainerSpecification.from(configuration),
Protos.TaskID.newBuilder().setValue("test-task-id").build(),
MesosUtils.createMesosSchedulerConfiguration(configuration, "localhost"));
final Protos.TaskInfo taskInfo = launchableTask.launch(
Protos.SlaveID.newBuilder().setValue("test-slave-id").build(),
new MesosResourceAllocation(Collections.singleton(ports(range(1000, 2000)))));
assertThat(
taskInfo.getCommand().getValue(),
containsString(ContainerSpecification.createDynamicProperty(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), memorySize.toString())));
}
示例18
@Override
public void configure(ContainerSpecification container) throws IOException {
if(keystore != null) {
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
.setSource(keystore)
.setDest(TARGET_KEYSTORE_PATH)
.setCachable(false)
.build());
container.getFlinkConfiguration().setString(SecurityOptions.SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath());
}
if(truststore != null) {
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
.setSource(truststore)
.setDest(TARGET_TRUSTSTORE_PATH)
.setCachable(false)
.build());
container.getFlinkConfiguration().setString(SecurityOptions.SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath());
}
}
示例19
private void testConfigure(
File binFolder,
File libFolder,
File pluginsFolder,
File confFolder,
Path[] files) throws IOException {
ContainerSpecification containerSpecification = new ContainerSpecification();
FlinkDistributionOverlay overlay = new FlinkDistributionOverlay(
binFolder,
confFolder,
libFolder,
pluginsFolder);
overlay.configure(containerSpecification);
for (Path file : files) {
checkArtifact(containerSpecification, new Path(TARGET_ROOT, file.toString()));
}
}
示例20
public static ContainerSpecification createContainerSpec(Configuration configuration, Configuration dynamicProperties)
throws Exception {
// generate a container spec which conveys the artifacts/vars needed to launch a TM
ContainerSpecification spec = new ContainerSpecification();
// propagate the AM dynamic configuration to the TM
spec.getDynamicConfiguration().addAll(dynamicProperties);
applyOverlays(configuration, spec);
return spec;
}
示例21
@Test
public void testNoConf() throws Exception {
SSLStoreOverlay overlay = new SSLStoreOverlay(null, null);
ContainerSpecification containerSpecification = new ContainerSpecification();
overlay.configure(containerSpecification);
}
示例22
/**
* Configures an artifact server to serve the artifacts associated with a container specification.
* @param server the server to configure.
* @param container the container with artifacts to serve.
* @throws IOException if the artifacts cannot be accessed.
*/
static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
// serve the artifacts associated with the container environment
for (ContainerSpecification.Artifact artifact : container.getArtifacts()) {
server.addPath(artifact.source, artifact.dest);
}
}
示例23
@Override
public void configure(ContainerSpecification container) throws IOException {
if(keytab != null) {
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
.setSource(keytab)
.setDest(TARGET_PATH)
.setCachable(false)
.build());
container.getDynamicConfiguration().setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, TARGET_PATH.getPath());
}
}
示例24
@Override
public void configure(ContainerSpecification container) throws IOException {
if(ugi != null) {
// overlay the Hadoop user identity (w/ tokens)
container.getEnvironmentVariables().put("HADOOP_USER_NAME", ugi.getUserName());
}
}
示例25
@Override
public void configure(ContainerSpecification container) throws IOException {
if(krb5Conf != null) {
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
.setSource(krb5Conf)
.setDest(TARGET_PATH)
.setCachable(true)
.build());
container.getSystemProperties().setString(JAVA_SECURITY_KRB5_CONF, TARGET_PATH.getPath());
}
}
示例26
@Override
public void configure(ContainerSpecification container) throws IOException {
if(hadoopConfDir == null) {
return;
}
File coreSitePath = new File(hadoopConfDir, "core-site.xml");
File hdfsSitePath = new File(hadoopConfDir, "hdfs-site.xml");
container.getEnvironmentVariables().put("HADOOP_CONF_DIR", TARGET_CONF_DIR.toString());
container.getDynamicConfiguration().setString(ConfigConstants.PATH_HADOOP_CONFIG, TARGET_CONF_DIR.toString());
container.getArtifacts().add(ContainerSpecification.Artifact
.newBuilder()
.setSource(new Path(coreSitePath.toURI()))
.setDest(new Path(TARGET_CONF_DIR, coreSitePath.getName()))
.setCachable(true)
.build());
container.getArtifacts().add(ContainerSpecification.Artifact
.newBuilder()
.setSource(new Path(hdfsSitePath.toURI()))
.setDest(new Path(TARGET_CONF_DIR, hdfsSitePath.getName()))
.setCachable(true)
.build());
}
示例27
@Test
public void testConfigure() throws Exception {
File binFolder = tempFolder.newFolder("bin");
File libFolder = tempFolder.newFolder("lib");
File confFolder = tempFolder.newFolder("conf");
Path[] files = createPaths(
tempFolder.getRoot(),
"bin/config.sh",
"bin/taskmanager.sh",
"lib/foo.jar",
"lib/A/foo.jar",
"lib/B/foo.jar",
"lib/B/bar.jar");
ContainerSpecification containerSpecification = new ContainerSpecification();
FlinkDistributionOverlay overlay = new FlinkDistributionOverlay(
binFolder,
confFolder,
libFolder
);
overlay.configure(containerSpecification);
for(Path file : files) {
checkArtifact(containerSpecification, new Path(TARGET_ROOT, file.toString()));
}
}
示例28
/**
* Check that an artifact exists for the given remote path.
*/
protected static ContainerSpecification.Artifact checkArtifact(ContainerSpecification spec, Path remotePath) {
for(ContainerSpecification.Artifact artifact : spec.getArtifacts()) {
if(remotePath.equals(artifact.dest)) {
return artifact;
}
}
throw new AssertionError("no such artifact (" + remotePath + ")");
}
示例29
@Test
public void testNoConf() throws Exception {
HadoopUserOverlay overlay = new HadoopUserOverlay(null);
ContainerSpecification containerSpecification = new ContainerSpecification();
overlay.configure(containerSpecification);
}
示例30
@Test
public void testNoConf() throws Exception {
SSLStoreOverlay overlay = new SSLStoreOverlay(null, null);
ContainerSpecification containerSpecification = new ContainerSpecification();
overlay.configure(containerSpecification);
}