Java源码示例:org.apache.flink.configuration.AkkaOptions
示例1
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
DataStream<Integer> data = env.createInput(new CustomInputFormat());
data.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
@Override
public Tuple2<Integer, Double> map(Integer value) throws Exception {
return new Tuple2<Integer, Double>(value, value * 0.5);
}
}).addSink(new NoOpSink());
env.execute();
}
示例2
protected Configuration createClusterConfig() throws IOException {
TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final File haDir = temporaryFolder.newFolder();
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
if (zkServer != null) {
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
}
return config;
}
示例3
/**
*
* @param hostname The hostname or address where the target RPC service is listening.
* @param port The port where the target RPC service is listening.
* @param endpointName The name of the RPC endpoint.
* @param addressResolution Whether to try address resolution of the given hostname or not.
* This allows to fail fast in case that the hostname cannot be resolved.
* @param config The configuration from which to deduce further settings.
*
* @return The RPC URL of the specified RPC endpoint.
*/
public static String getRpcUrl(
String hostname,
int port,
String endpointName,
HighAvailabilityServicesUtils.AddressResolution addressResolution,
Configuration config) throws UnknownHostException {
checkNotNull(config, "config is null");
final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) &&
SSLUtils.isInternalSSLEnabled(config);
return getRpcUrl(
hostname,
port,
endpointName,
addressResolution,
sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
}
示例4
public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
final Time rpcTimeout;
try {
rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
} catch (NumberFormatException e) {
throw new ConfigurationException("Could not parse the resource manager's timeout " +
"value " + AkkaOptions.ASK_TIMEOUT + '.', e);
}
final Time slotRequestTimeout = getSlotRequestTimeout(configuration);
final Time taskManagerTimeout = Time.milliseconds(
configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));
boolean waitResultConsumedBeforeRelease =
configuration.getBoolean(ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);
return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout, waitResultConsumedBeforeRelease);
}
示例5
@Before
public void setUp() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(), Collections.<URL>emptyList());
program = mock(PackagedProgram.class);
when(program.getPlanWithJars()).thenReturn(jobWithJars);
final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, freePort);
config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
}
示例6
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
DataStream<Integer> data = env.createInput(new CustomInputFormat());
data.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
@Override
public Tuple2<Integer, Double> map(Integer value) throws Exception {
return new Tuple2<Integer, Double>(value, value * 0.5);
}
}).addSink(new DiscardingSink<>());
env.execute();
}
示例7
protected Configuration createClusterConfig() throws IOException {
TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final File haDir = temporaryFolder.newFolder();
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
if (zkServer != null) {
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
}
return config;
}
示例8
/**
*
* @param hostname The hostname or address where the target RPC service is listening.
* @param port The port where the target RPC service is listening.
* @param endpointName The name of the RPC endpoint.
* @param addressResolution Whether to try address resolution of the given hostname or not.
* This allows to fail fast in case that the hostname cannot be resolved.
* @param config The configuration from which to deduce further settings.
*
* @return The RPC URL of the specified RPC endpoint.
*/
public static String getRpcUrl(
String hostname,
int port,
String endpointName,
HighAvailabilityServicesUtils.AddressResolution addressResolution,
Configuration config) throws UnknownHostException {
checkNotNull(config, "config is null");
final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) &&
SSLUtils.isInternalSSLEnabled(config);
return getRpcUrl(
hostname,
port,
endpointName,
addressResolution,
sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
}
示例9
public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
final Time rpcTimeout;
try {
rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
} catch (NumberFormatException e) {
throw new ConfigurationException("Could not parse the resource manager's timeout " +
"value " + AkkaOptions.ASK_TIMEOUT + '.', e);
}
final Time slotRequestTimeout = getSlotRequestTimeout(configuration);
final Time taskManagerTimeout = Time.milliseconds(
configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));
boolean waitResultConsumedBeforeRelease =
configuration.getBoolean(ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);
return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout, waitResultConsumedBeforeRelease);
}
示例10
@Before
public void setUp() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(), Collections.<URL>emptyList());
program = mock(PackagedProgram.class);
when(program.getPlanWithJars()).thenReturn(jobWithJars);
final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, freePort);
config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
}
示例11
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> data = env.createInput(new CustomInputFormat());
data.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
@Override
public Tuple2<Integer, Double> map(Integer value) throws Exception {
return new Tuple2<Integer, Double>(value, value * 0.5);
}
}).addSink(new DiscardingSink<>());
env.execute();
}
示例12
@Test
public void testDynamicProperties() throws Exception {
FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"",
"",
false);
Options options = new Options();
cli.addGeneralOptions(options);
cli.addRunOptions(options);
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar",
"-D", AkkaOptions.ASK_TIMEOUT.key() + "=5 min",
"-D", CoreOptions.FLINK_JVM_OPTIONS.key() + "=-DappName=foobar",
"-D", SecurityOptions.SSL_INTERNAL_KEY_PASSWORD.key() + "=changeit"});
Configuration executorConfig = cli.applyCommandLineOptionsToConfiguration(cmd);
assertEquals("5 min", executorConfig.get(AkkaOptions.ASK_TIMEOUT));
assertEquals("-DappName=foobar", executorConfig.get(CoreOptions.FLINK_JVM_OPTIONS));
assertEquals("changeit", executorConfig.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD));
}
示例13
/**
*
* @param hostname The hostname or address where the target RPC service is listening.
* @param port The port where the target RPC service is listening.
* @param endpointName The name of the RPC endpoint.
* @param addressResolution Whether to try address resolution of the given hostname or not.
* This allows to fail fast in case that the hostname cannot be resolved.
* @param config The configuration from which to deduce further settings.
*
* @return The RPC URL of the specified RPC endpoint.
*/
public static String getRpcUrl(
String hostname,
int port,
String endpointName,
HighAvailabilityServicesUtils.AddressResolution addressResolution,
Configuration config) throws UnknownHostException {
checkNotNull(config, "config is null");
final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) &&
SSLUtils.isInternalSSLEnabled(config);
return getRpcUrl(
hostname,
port,
endpointName,
addressResolution,
sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
}
示例14
protected static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
return flinkConfig;
}
示例15
private static Configuration getConfiguration() {
verifyJvmOptions();
Configuration config = new Configuration();
config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
return config;
}
示例16
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);
return config;
}
示例17
/**
* Ensure that the program parallelism can be set even if the configuration is supplied.
*/
@Test
public void testUserSpecificParallelism() throws Exception {
Configuration config = new Configuration();
config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
final URI restAddress = MINI_CLUSTER_RESOURCE.getRestAddres();
final String hostname = restAddress.getHost();
final int port = restAddress.getPort();
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
hostname,
port,
config
);
env.setParallelism(USER_DOP);
env.getConfig().disableSysoutLogging();
DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
.rebalance()
.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
@Override
public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
out.collect(getRuntimeContext().getIndexOfThisSubtask());
}
});
List<Integer> resultCollection = result.collect();
assertEquals(USER_DOP, resultCollection.size());
}
示例18
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
return config;
}
示例19
private static void setupConfigurationFromVariables(Configuration configuration, String currDir, Map<String, String> variables) throws IOException {
final String yarnClientUsername = variables.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
final String remoteKeytabPath = variables.get(YarnConfigKeys.KEYTAB_PATH);
LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
final String remoteKeytabPrincipal = variables.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
String keytabPath = null;
if (remoteKeytabPath != null) {
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
keytabPath = f.getAbsolutePath();
LOG.info("keytab path: {}", keytabPath);
}
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
currentUser.getShortUserName(), yarnClientUsername);
if (keytabPath != null && remoteKeytabPrincipal != null) {
configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
// use the hostname passed by job manager
final String taskExecutorHostname = variables.get(YarnResourceManager.ENV_FLINK_NODE_ID);
if (taskExecutorHostname != null) {
configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname);
}
}
示例20
public static ForkJoinExecutorConfiguration fromConfiguration(final Configuration configuration) {
final double parallelismFactor = configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
final int minParallelism = configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
final int maxParallelism = configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
return new ForkJoinExecutorConfiguration(parallelismFactor, minParallelism, maxParallelism);
}
示例21
public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
String failuresIntervalString = configuration.getString(
ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString()
);
String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString);
Duration failuresInterval = Duration.apply(failuresIntervalString);
Duration delay = Duration.apply(delayString);
return new FailureRateRestartStrategyFactory(maxFailuresPerInterval, Time.milliseconds(failuresInterval.toMillis()), Time.milliseconds(delay.toMillis()));
}
示例22
@BeforeClass
public static void setup() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.FRAMESIZE, maxFrameSize + "b");
akkaRpcService1 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
akkaRpcService2 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
}
示例23
@BeforeClass
public static void setupClass() throws Exception {
final Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.FRAMESIZE, FRAMESIZE + " b");
rpcService1 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
rpcService2 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
}
示例24
/**
* Sets all necessary configuration keys to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
*
* @param config Configuration to use
* @param zooKeeperQuorum ZooKeeper quorum to connect to
* @param fsStateHandlePath Base path for file system state backend (for checkpoints and
* recovery)
* @return The modified configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
*/
public static Configuration configureZooKeeperHA(
Configuration config,
String zooKeeperQuorum,
String fsStateHandlePath) {
checkNotNull(config, "Configuration");
checkNotNull(zooKeeperQuorum, "ZooKeeper quorum");
checkNotNull(fsStateHandlePath, "File state handle backend path");
// ZooKeeper recovery mode
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum);
int connTimeout = 5000;
if (System.getenv().containsKey("CI")) {
// The regular timeout is to aggressive for Travis and connections are often lost.
connTimeout = 30000;
}
config.setInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
// File system state backend
config.setString(CheckpointingOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, fsStateHandlePath + "/checkpoints");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");
// Akka failure detection and execution retries
config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s");
return config;
}
示例25
private static Configuration getFlinkConfiguration() {
final Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
return config;
}
示例26
protected static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
return flinkConfig;
}
示例27
private static Configuration getConfiguration() {
verifyJvmOptions();
Configuration config = new Configuration();
config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 2048);
return config;
}
示例28
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);
return config;
}
示例29
/**
* Ensure that the program parallelism can be set even if the configuration is supplied.
*/
@Test
public void testUserSpecificParallelism() throws Exception {
Configuration config = new Configuration();
config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
final URI restAddress = MINI_CLUSTER_RESOURCE.getRestAddres();
final String hostname = restAddress.getHost();
final int port = restAddress.getPort();
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
hostname,
port,
config
);
env.setParallelism(USER_DOP);
env.getConfig().disableSysoutLogging();
DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
.rebalance()
.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
@Override
public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
out.collect(getRuntimeContext().getIndexOfThisSubtask());
}
});
List<Integer> resultCollection = result.collect();
assertEquals(USER_DOP, resultCollection.size());
}
示例30
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
return config;
}