Java源码示例:org.apache.flink.runtime.state.memory.MemoryStateBackendFactory

示例1
/**
 * Validates loading a memory state backend from the cluster configuration.
 */
@Test
public void testLoadMemoryStateBackendNoParameters() throws Exception {
	// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
	// to guard against config-breaking changes of the name

	final Configuration config1 = new Configuration();
	config1.setString(backendKey, "jobmanager");

	final Configuration config2 = new Configuration();
	config2.setString(backendKey, MemoryStateBackendFactory.class.getName());

	StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
	StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);

	assertTrue(backend1 instanceof MemoryStateBackend);
	assertTrue(backend2 instanceof MemoryStateBackend);
}
 
示例2
/**
 * Validates loading a memory state backend from the cluster configuration.
 */
@Test
public void testLoadMemoryStateBackendNoParameters() throws Exception {
	// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
	// to guard against config-breaking changes of the name

	final Configuration config1 = new Configuration();
	config1.setString(backendKey, "jobmanager");

	final Configuration config2 = new Configuration();
	config2.setString(backendKey, MemoryStateBackendFactory.class.getName());

	StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
	StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);

	assertTrue(backend1 instanceof MemoryStateBackend);
	assertTrue(backend2 instanceof MemoryStateBackend);
}
 
示例3
/**
 * Validates loading a memory state backend from the cluster configuration.
 */
@Test
public void testLoadMemoryStateBackendNoParameters() throws Exception {
	// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
	// to guard against config-breaking changes of the name

	final Configuration config1 = new Configuration();
	config1.setString(backendKey, "jobmanager");

	final Configuration config2 = new Configuration();
	config2.setString(backendKey, MemoryStateBackendFactory.class.getName());

	StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
	StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);

	assertTrue(backend1 instanceof MemoryStateBackend);
	assertTrue(backend2 instanceof MemoryStateBackend);
}
 
示例4
/**
 * Validates loading a memory state backend with additional parameters from the cluster configuration.
 */
@Test
public void testLoadMemoryStateWithParameters() throws Exception {
	final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
	final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
	final Path expectedCheckpointPath = new Path(checkpointDir);
	final Path expectedSavepointPath = new Path(savepointDir);

	final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();

	// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
	// to guard against config-breaking changes of the name

	final Configuration config1 = new Configuration();
	config1.setString(backendKey, "jobmanager");
	config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
	config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
	config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);

	final Configuration config2 = new Configuration();
	config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
	config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
	config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
	config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);

	MemoryStateBackend backend1 = (MemoryStateBackend)
			StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
	MemoryStateBackend backend2 = (MemoryStateBackend)
			StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);

	assertNotNull(backend1);
	assertNotNull(backend2);

	assertEquals(expectedCheckpointPath, backend1.getCheckpointPath());
	assertEquals(expectedCheckpointPath, backend2.getCheckpointPath());
	assertEquals(expectedSavepointPath, backend1.getSavepointPath());
	assertEquals(expectedSavepointPath, backend2.getSavepointPath());
	assertEquals(async, backend1.isUsingAsynchronousSnapshots());
	assertEquals(async, backend2.isUsingAsynchronousSnapshots());
}
 
示例5
/**
 * Validates loading a memory state backend with additional parameters from the cluster configuration.
 */
@Test
public void testLoadMemoryStateWithParameters() throws Exception {
	final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
	final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
	final Path expectedCheckpointPath = new Path(checkpointDir);
	final Path expectedSavepointPath = new Path(savepointDir);

	final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();

	// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
	// to guard against config-breaking changes of the name

	final Configuration config1 = new Configuration();
	config1.setString(backendKey, "jobmanager");
	config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
	config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
	config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);

	final Configuration config2 = new Configuration();
	config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
	config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
	config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
	config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);

	MemoryStateBackend backend1 = (MemoryStateBackend)
			StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
	MemoryStateBackend backend2 = (MemoryStateBackend)
			StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);

	assertNotNull(backend1);
	assertNotNull(backend2);

	assertEquals(expectedCheckpointPath, backend1.getCheckpointPath());
	assertEquals(expectedCheckpointPath, backend2.getCheckpointPath());
	assertEquals(expectedSavepointPath, backend1.getSavepointPath());
	assertEquals(expectedSavepointPath, backend2.getSavepointPath());
	assertEquals(async, backend1.isUsingAsynchronousSnapshots());
	assertEquals(async, backend2.isUsingAsynchronousSnapshots());
}
 
示例6
/**
 * Validates loading a memory state backend with additional parameters from the cluster configuration.
 */
@Test
public void testLoadMemoryStateWithParameters() throws Exception {
	final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
	final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
	final Path expectedCheckpointPath = new Path(checkpointDir);
	final Path expectedSavepointPath = new Path(savepointDir);

	final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();

	// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
	// to guard against config-breaking changes of the name

	final Configuration config1 = new Configuration();
	config1.setString(backendKey, "jobmanager");
	config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
	config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
	config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);

	final Configuration config2 = new Configuration();
	config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
	config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
	config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
	config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);

	MemoryStateBackend backend1 = (MemoryStateBackend)
			StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
	MemoryStateBackend backend2 = (MemoryStateBackend)
			StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);

	assertNotNull(backend1);
	assertNotNull(backend2);

	assertEquals(expectedCheckpointPath, backend1.getCheckpointPath());
	assertEquals(expectedCheckpointPath, backend2.getCheckpointPath());
	assertEquals(expectedSavepointPath, backend1.getSavepointPath());
	assertEquals(expectedSavepointPath, backend2.getSavepointPath());
	assertEquals(async, backend1.isUsingAsynchronousSnapshots());
	assertEquals(async, backend2.isUsingAsynchronousSnapshots());
}
 
示例7
/**
 * Checks if an application-defined state backend is given, and if not, loads the state
 * backend from the configuration, from the parameter 'state.backend', as defined
 * in {@link CheckpointingOptions#STATE_BACKEND}. If no state backend is configured, this instantiates the
 * default state backend (the {@link MemoryStateBackend}). 
 *
 * <p>If an application-defined state backend is found, and the state backend is a
 * {@link ConfigurableStateBackend}, this methods calls {@link ConfigurableStateBackend#configure(Configuration, ClassLoader)}
 * on the state backend.
 *
 * <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on
 * how the state backend is loaded from the configuration.
 *
 * @param config The configuration to load the state backend from
 * @param classLoader The class loader that should be used to load the state backend
 * @param logger Optionally, a logger to log actions to (may be null)
 *
 * @return The instantiated state backend.
 *
 * @throws DynamicCodeLoadingException
 *             Thrown if a state backend factory is configured and the factory class was not
 *             found or the factory could not be instantiated
 * @throws IllegalConfigurationException
 *             May be thrown by the StateBackendFactory when creating / configuring the state
 *             backend in the factory
 * @throws IOException
 *             May be thrown by the StateBackendFactory when instantiating the state backend
 */
public static StateBackend fromApplicationOrConfigOrDefault(
		@Nullable StateBackend fromApplication,
		Configuration config,
		ClassLoader classLoader,
		@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {

	checkNotNull(config, "config");
	checkNotNull(classLoader, "classLoader");

	final StateBackend backend;

	// (1) the application defined state backend has precedence
	if (fromApplication != null) {
		if (logger != null) {
			logger.info("Using application-defined state backend: {}", fromApplication);
		}

		// see if this is supposed to pick up additional configuration parameters
		if (fromApplication instanceof ConfigurableStateBackend) {
			// needs to pick up configuration
			if (logger != null) {
				logger.info("Configuring application-defined state backend with job/cluster config");
			}

			backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
		}
		else {
			// keep as is!
			backend = fromApplication;
		}
	}
	else {
		// (2) check if the config defines a state backend
		final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
		if (fromConfig != null) {
			backend = fromConfig;
		}
		else {
			// (3) use the default
			backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
			if (logger != null) {
				logger.info("No state backend has been configured, using default (Memory / JobManager) {}", backend);
			}
		}
	}

	return backend;
}
 
示例8
/**
 * Checks if an application-defined state backend is given, and if not, loads the state
 * backend from the configuration, from the parameter 'state.backend', as defined
 * in {@link CheckpointingOptions#STATE_BACKEND}. If no state backend is configured, this instantiates the
 * default state backend (the {@link MemoryStateBackend}). 
 *
 * <p>If an application-defined state backend is found, and the state backend is a
 * {@link ConfigurableStateBackend}, this methods calls {@link ConfigurableStateBackend#configure(Configuration, ClassLoader)}
 * on the state backend.
 *
 * <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on
 * how the state backend is loaded from the configuration.
 *
 * @param config The configuration to load the state backend from
 * @param classLoader The class loader that should be used to load the state backend
 * @param logger Optionally, a logger to log actions to (may be null)
 *
 * @return The instantiated state backend.
 *
 * @throws DynamicCodeLoadingException
 *             Thrown if a state backend factory is configured and the factory class was not
 *             found or the factory could not be instantiated
 * @throws IllegalConfigurationException
 *             May be thrown by the StateBackendFactory when creating / configuring the state
 *             backend in the factory
 * @throws IOException
 *             May be thrown by the StateBackendFactory when instantiating the state backend
 */
public static StateBackend fromApplicationOrConfigOrDefault(
		@Nullable StateBackend fromApplication,
		Configuration config,
		ClassLoader classLoader,
		@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {

	checkNotNull(config, "config");
	checkNotNull(classLoader, "classLoader");

	final StateBackend backend;

	// (1) the application defined state backend has precedence
	if (fromApplication != null) {
		if (logger != null) {
			logger.info("Using application-defined state backend: {}", fromApplication);
		}

		// see if this is supposed to pick up additional configuration parameters
		if (fromApplication instanceof ConfigurableStateBackend) {
			// needs to pick up configuration
			if (logger != null) {
				logger.info("Configuring application-defined state backend with job/cluster config");
			}

			backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
		}
		else {
			// keep as is!
			backend = fromApplication;
		}
	}
	else {
		// (2) check if the config defines a state backend
		final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
		if (fromConfig != null) {
			backend = fromConfig;
		}
		else {
			// (3) use the default
			backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
			if (logger != null) {
				logger.info("No state backend has been configured, using default (Memory / JobManager) {}", backend);
			}
		}
	}

	return backend;
}
 
示例9
/**
 * Checks if an application-defined state backend is given, and if not, loads the state
 * backend from the configuration, from the parameter 'state.backend', as defined
 * in {@link CheckpointingOptions#STATE_BACKEND}. If no state backend is configured, this instantiates the
 * default state backend (the {@link MemoryStateBackend}). 
 *
 * <p>If an application-defined state backend is found, and the state backend is a
 * {@link ConfigurableStateBackend}, this methods calls {@link ConfigurableStateBackend#configure(ReadableConfig, ClassLoader)}
 * on the state backend.
 *
 * <p>Refer to {@link #loadStateBackendFromConfig(ReadableConfig, ClassLoader, Logger)} for details on
 * how the state backend is loaded from the configuration.
 *
 * @param config The configuration to load the state backend from
 * @param classLoader The class loader that should be used to load the state backend
 * @param logger Optionally, a logger to log actions to (may be null)
 *
 * @return The instantiated state backend.
 *
 * @throws DynamicCodeLoadingException
 *             Thrown if a state backend factory is configured and the factory class was not
 *             found or the factory could not be instantiated
 * @throws IllegalConfigurationException
 *             May be thrown by the StateBackendFactory when creating / configuring the state
 *             backend in the factory
 * @throws IOException
 *             May be thrown by the StateBackendFactory when instantiating the state backend
 */
public static StateBackend fromApplicationOrConfigOrDefault(
		@Nullable StateBackend fromApplication,
		Configuration config,
		ClassLoader classLoader,
		@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {

	checkNotNull(config, "config");
	checkNotNull(classLoader, "classLoader");

	final StateBackend backend;

	// (1) the application defined state backend has precedence
	if (fromApplication != null) {
		// see if this is supposed to pick up additional configuration parameters
		if (fromApplication instanceof ConfigurableStateBackend) {
			// needs to pick up configuration
			if (logger != null) {
				logger.info("Using job/cluster config to configure application-defined state backend: {}", fromApplication);
			}

			backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
		}
		else {
			// keep as is!
			backend = fromApplication;
		}

		if (logger != null) {
			logger.info("Using application-defined state backend: {}", backend);
		}
	}
	else {
		// (2) check if the config defines a state backend
		final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
		if (fromConfig != null) {
			backend = fromConfig;
		}
		else {
			// (3) use the default
			backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
			if (logger != null) {
				logger.info("No state backend has been configured, using default (Memory / JobManager) {}", backend);
			}
		}
	}

	return backend;
}