Java源码示例:org.apache.flink.configuration.AkkaOptions

示例1
public static void main(String[] args) throws Exception {
			Configuration config = new Configuration();

	config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	env.getConfig().disableSysoutLogging();

	DataStream<Integer> data = env.createInput(new CustomInputFormat());

	data.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
		@Override
		public Tuple2<Integer, Double> map(Integer value) throws Exception {
			return new Tuple2<Integer, Double>(value, value * 0.5);
		}
	}).addSink(new NoOpSink());

	env.execute();
}
 
示例2
protected Configuration createClusterConfig() throws IOException {
	TemporaryFolder temporaryFolder = new TemporaryFolder();
	temporaryFolder.create();
	final File haDir = temporaryFolder.newFolder();

	Configuration config = new Configuration();
	config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
	// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
	config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
	config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");

	if (zkServer != null) {
		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
	}
	return config;
}
 
示例3
/**
 *
 * @param hostname The hostname or address where the target RPC service is listening.
 * @param port The port where the target RPC service is listening.
 * @param endpointName The name of the RPC endpoint.
 * @param addressResolution Whether to try address resolution of the given hostname or not.
 *                          This allows to fail fast in case that the hostname cannot be resolved.
 * @param config The configuration from which to deduce further settings.
 *
 * @return The RPC URL of the specified RPC endpoint.
 */
public static String getRpcUrl(
	String hostname,
	int port,
	String endpointName,
	HighAvailabilityServicesUtils.AddressResolution addressResolution,
	Configuration config) throws UnknownHostException {

	checkNotNull(config, "config is null");

	final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) &&
			SSLUtils.isInternalSSLEnabled(config);

	return getRpcUrl(
		hostname,
		port,
		endpointName,
		addressResolution,
		sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
}
 
示例4
public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
	final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
	final Time rpcTimeout;

	try {
		rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
	} catch (NumberFormatException e) {
		throw new ConfigurationException("Could not parse the resource manager's timeout " +
			"value " + AkkaOptions.ASK_TIMEOUT + '.', e);
	}

	final Time slotRequestTimeout = getSlotRequestTimeout(configuration);
	final Time taskManagerTimeout = Time.milliseconds(
			configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));

	boolean waitResultConsumedBeforeRelease =
		configuration.getBoolean(ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);

	return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout, waitResultConsumedBeforeRelease);
}
 
示例5
@Before
public void setUp() throws Exception {

	ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
	env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());

	Plan plan = env.createProgramPlan();
	JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(),  Collections.<URL>emptyList());

	program = mock(PackagedProgram.class);
	when(program.getPlanWithJars()).thenReturn(jobWithJars);

	final int freePort = NetUtils.getAvailablePort();
	config = new Configuration();
	config.setString(JobManagerOptions.ADDRESS, "localhost");
	config.setInteger(JobManagerOptions.PORT, freePort);
	config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
}
 
示例6
public static void main(String[] args) throws Exception {
			Configuration config = new Configuration();

	config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	env.getConfig().disableSysoutLogging();

	DataStream<Integer> data = env.createInput(new CustomInputFormat());

	data.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
		@Override
		public Tuple2<Integer, Double> map(Integer value) throws Exception {
			return new Tuple2<Integer, Double>(value, value * 0.5);
		}
	}).addSink(new DiscardingSink<>());

	env.execute();
}
 
示例7
protected Configuration createClusterConfig() throws IOException {
	TemporaryFolder temporaryFolder = new TemporaryFolder();
	temporaryFolder.create();
	final File haDir = temporaryFolder.newFolder();

	Configuration config = new Configuration();
	config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
	// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
	config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
	config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");

	if (zkServer != null) {
		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
	}
	return config;
}
 
示例8
/**
 *
 * @param hostname The hostname or address where the target RPC service is listening.
 * @param port The port where the target RPC service is listening.
 * @param endpointName The name of the RPC endpoint.
 * @param addressResolution Whether to try address resolution of the given hostname or not.
 *                          This allows to fail fast in case that the hostname cannot be resolved.
 * @param config The configuration from which to deduce further settings.
 *
 * @return The RPC URL of the specified RPC endpoint.
 */
public static String getRpcUrl(
	String hostname,
	int port,
	String endpointName,
	HighAvailabilityServicesUtils.AddressResolution addressResolution,
	Configuration config) throws UnknownHostException {

	checkNotNull(config, "config is null");

	final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) &&
			SSLUtils.isInternalSSLEnabled(config);

	return getRpcUrl(
		hostname,
		port,
		endpointName,
		addressResolution,
		sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
}
 
示例9
public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
	final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
	final Time rpcTimeout;

	try {
		rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
	} catch (NumberFormatException e) {
		throw new ConfigurationException("Could not parse the resource manager's timeout " +
			"value " + AkkaOptions.ASK_TIMEOUT + '.', e);
	}

	final Time slotRequestTimeout = getSlotRequestTimeout(configuration);
	final Time taskManagerTimeout = Time.milliseconds(
			configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));

	boolean waitResultConsumedBeforeRelease =
		configuration.getBoolean(ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);

	return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout, waitResultConsumedBeforeRelease);
}
 
示例10
@Before
public void setUp() throws Exception {

	ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
	env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());

	Plan plan = env.createProgramPlan();
	JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(),  Collections.<URL>emptyList());

	program = mock(PackagedProgram.class);
	when(program.getPlanWithJars()).thenReturn(jobWithJars);

	final int freePort = NetUtils.getAvailablePort();
	config = new Configuration();
	config.setString(JobManagerOptions.ADDRESS, "localhost");
	config.setInteger(JobManagerOptions.PORT, freePort);
	config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
}
 
示例11
public static void main(String[] args) throws Exception {
			Configuration config = new Configuration();

	config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	DataStream<Integer> data = env.createInput(new CustomInputFormat());

	data.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
		@Override
		public Tuple2<Integer, Double> map(Integer value) throws Exception {
			return new Tuple2<Integer, Double>(value, value * 0.5);
		}
	}).addSink(new DiscardingSink<>());

	env.execute();
}
 
示例12
@Test
public void testDynamicProperties() throws Exception {

	FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
		new Configuration(),
		tmp.getRoot().getAbsolutePath(),
		"",
		"",
		false);
	Options options = new Options();
	cli.addGeneralOptions(options);
	cli.addRunOptions(options);

	CommandLineParser parser = new DefaultParser();
	CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar",
			"-D", AkkaOptions.ASK_TIMEOUT.key() + "=5 min",
			"-D", CoreOptions.FLINK_JVM_OPTIONS.key() + "=-DappName=foobar",
			"-D", SecurityOptions.SSL_INTERNAL_KEY_PASSWORD.key() + "=changeit"});

	Configuration executorConfig = cli.applyCommandLineOptionsToConfiguration(cmd);
	assertEquals("5 min", executorConfig.get(AkkaOptions.ASK_TIMEOUT));
	assertEquals("-DappName=foobar", executorConfig.get(CoreOptions.FLINK_JVM_OPTIONS));
	assertEquals("changeit", executorConfig.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD));
}
 
示例13
/**
 *
 * @param hostname The hostname or address where the target RPC service is listening.
 * @param port The port where the target RPC service is listening.
 * @param endpointName The name of the RPC endpoint.
 * @param addressResolution Whether to try address resolution of the given hostname or not.
 *                          This allows to fail fast in case that the hostname cannot be resolved.
 * @param config The configuration from which to deduce further settings.
 *
 * @return The RPC URL of the specified RPC endpoint.
 */
public static String getRpcUrl(
	String hostname,
	int port,
	String endpointName,
	HighAvailabilityServicesUtils.AddressResolution addressResolution,
	Configuration config) throws UnknownHostException {

	checkNotNull(config, "config is null");

	final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) &&
			SSLUtils.isInternalSSLEnabled(config);

	return getRpcUrl(
		hostname,
		port,
		endpointName,
		addressResolution,
		sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
}
 
示例14
protected static Configuration getFlinkConfiguration() {
	Configuration flinkConfig = new Configuration();
	flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
	flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
	flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
	flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
	flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
	return flinkConfig;
}
 
示例15
private static Configuration getConfiguration() {
	verifyJvmOptions();
	Configuration config = new Configuration();
	config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
	config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
	config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
	config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);

	return config;
}
 
示例16
private static Configuration getConfiguration() {
	Configuration config = new Configuration();
	config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
	config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);

	return config;
}
 
示例17
/**
 * Ensure that the program parallelism can be set even if the configuration is supplied.
 */
@Test
public void testUserSpecificParallelism() throws Exception {
	Configuration config = new Configuration();
	config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);

	final URI restAddress = MINI_CLUSTER_RESOURCE.getRestAddres();
	final String hostname = restAddress.getHost();
	final int port = restAddress.getPort();

	final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
			hostname,
			port,
			config
	);
	env.setParallelism(USER_DOP);
	env.getConfig().disableSysoutLogging();

	DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
			.rebalance()
			.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
				@Override
				public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
					out.collect(getRuntimeContext().getIndexOfThisSubtask());
				}
			});
	List<Integer> resultCollection = result.collect();
	assertEquals(USER_DOP, resultCollection.size());
}
 
示例18
private static Configuration getConfiguration() {
	Configuration config = new Configuration();
	config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
	config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
	config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
	return config;
}
 
示例19
private static void setupConfigurationFromVariables(Configuration configuration, String currDir, Map<String, String> variables) throws IOException {
	final String yarnClientUsername = variables.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);

	final String remoteKeytabPath = variables.get(YarnConfigKeys.KEYTAB_PATH);
	LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);

	final String remoteKeytabPrincipal = variables.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
	LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);

	// tell akka to die in case of an error
	configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);

	String keytabPath = null;
	if (remoteKeytabPath != null) {
		File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
		keytabPath = f.getAbsolutePath();
		LOG.info("keytab path: {}", keytabPath);
	}

	UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();

	LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
			currentUser.getShortUserName(), yarnClientUsername);

	if (keytabPath != null && remoteKeytabPrincipal != null) {
		configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
		configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
	}

	// use the hostname passed by job manager
	final String taskExecutorHostname = variables.get(YarnResourceManager.ENV_FLINK_NODE_ID);
	if (taskExecutorHostname != null) {
		configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname);
	}
}
 
示例20
public static ForkJoinExecutorConfiguration fromConfiguration(final Configuration configuration) {
	final double parallelismFactor = configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
	final int minParallelism = configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
	final int maxParallelism = configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX);

	return new ForkJoinExecutorConfiguration(parallelismFactor, minParallelism, maxParallelism);
}
 
示例21
public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
	int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
	String failuresIntervalString = configuration.getString(
			ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString()
	);
	String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
	String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString);

	Duration failuresInterval = Duration.apply(failuresIntervalString);
	Duration delay = Duration.apply(delayString);


	return new FailureRateRestartStrategyFactory(maxFailuresPerInterval, Time.milliseconds(failuresInterval.toMillis()), Time.milliseconds(delay.toMillis()));
}
 
示例22
@BeforeClass
public static void setup() throws Exception {
	Configuration configuration = new Configuration();
	configuration.setString(AkkaOptions.FRAMESIZE, maxFrameSize + "b");

	akkaRpcService1 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
	akkaRpcService2 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
}
 
示例23
@BeforeClass
public static void setupClass() throws Exception {
	final Configuration configuration = new Configuration();
	configuration.setString(AkkaOptions.FRAMESIZE, FRAMESIZE + " b");

	rpcService1 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
	rpcService2 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
}
 
示例24
/**
 * Sets all necessary configuration keys to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 *
 * @param config            Configuration to use
 * @param zooKeeperQuorum   ZooKeeper quorum to connect to
 * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
 *                          recovery)
 * @return The modified configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 */
public static Configuration configureZooKeeperHA(
		Configuration config,
		String zooKeeperQuorum,
		String fsStateHandlePath) {

	checkNotNull(config, "Configuration");
	checkNotNull(zooKeeperQuorum, "ZooKeeper quorum");
	checkNotNull(fsStateHandlePath, "File state handle backend path");

	// ZooKeeper recovery mode
	config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
	config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum);

	int connTimeout = 5000;
	if (System.getenv().containsKey("CI")) {
		// The regular timeout is to aggressive for Travis and connections are often lost.
		connTimeout = 30000;
	}

	config.setInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
	config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);

	// File system state backend
	config.setString(CheckpointingOptions.STATE_BACKEND, "FILESYSTEM");
	config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, fsStateHandlePath + "/checkpoints");
	config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");

	// Akka failure detection and execution retries
	config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
	config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
	config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
	config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
	config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s");

	return config;
}
 
示例25
private static Configuration getFlinkConfiguration() {
	final Configuration config = new Configuration();
	config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
	config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);

	return config;
}
 
示例26
protected static Configuration getFlinkConfiguration() {
	Configuration flinkConfig = new Configuration();
	flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
	flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
	flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
	flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
	flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
	return flinkConfig;
}
 
示例27
private static Configuration getConfiguration() {
	verifyJvmOptions();
	Configuration config = new Configuration();
	config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
	config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
	config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
	config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 2048);

	return config;
}
 
示例28
private static Configuration getConfiguration() {
	Configuration config = new Configuration();
	config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
	config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);

	return config;
}
 
示例29
/**
 * Ensure that the program parallelism can be set even if the configuration is supplied.
 */
@Test
public void testUserSpecificParallelism() throws Exception {
	Configuration config = new Configuration();
	config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);

	final URI restAddress = MINI_CLUSTER_RESOURCE.getRestAddres();
	final String hostname = restAddress.getHost();
	final int port = restAddress.getPort();

	final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
			hostname,
			port,
			config
	);
	env.setParallelism(USER_DOP);
	env.getConfig().disableSysoutLogging();

	DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
			.rebalance()
			.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
				@Override
				public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
					out.collect(getRuntimeContext().getIndexOfThisSubtask());
				}
			});
	List<Integer> resultCollection = result.collect();
	assertEquals(USER_DOP, resultCollection.size());
}
 
示例30
private static Configuration getConfiguration() {
	Configuration config = new Configuration();
	config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
	config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
	config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
	return config;
}