Java源码示例:org.apache.flink.runtime.state.memory.MemoryStateBackendFactory
示例1
/**
* Validates loading a memory state backend from the cluster configuration.
*/
@Test
public void testLoadMemoryStateBackendNoParameters() throws Exception {
// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
// to guard against config-breaking changes of the name
final Configuration config1 = new Configuration();
config1.setString(backendKey, "jobmanager");
final Configuration config2 = new Configuration();
config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
assertTrue(backend1 instanceof MemoryStateBackend);
assertTrue(backend2 instanceof MemoryStateBackend);
}
示例2
/**
* Validates loading a memory state backend from the cluster configuration.
*/
@Test
public void testLoadMemoryStateBackendNoParameters() throws Exception {
// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
// to guard against config-breaking changes of the name
final Configuration config1 = new Configuration();
config1.setString(backendKey, "jobmanager");
final Configuration config2 = new Configuration();
config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
assertTrue(backend1 instanceof MemoryStateBackend);
assertTrue(backend2 instanceof MemoryStateBackend);
}
示例3
/**
* Validates loading a memory state backend from the cluster configuration.
*/
@Test
public void testLoadMemoryStateBackendNoParameters() throws Exception {
// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
// to guard against config-breaking changes of the name
final Configuration config1 = new Configuration();
config1.setString(backendKey, "jobmanager");
final Configuration config2 = new Configuration();
config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
assertTrue(backend1 instanceof MemoryStateBackend);
assertTrue(backend2 instanceof MemoryStateBackend);
}
示例4
/**
* Validates loading a memory state backend with additional parameters from the cluster configuration.
*/
@Test
public void testLoadMemoryStateWithParameters() throws Exception {
final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
final Path expectedCheckpointPath = new Path(checkpointDir);
final Path expectedSavepointPath = new Path(savepointDir);
final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
// to guard against config-breaking changes of the name
final Configuration config1 = new Configuration();
config1.setString(backendKey, "jobmanager");
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
final Configuration config2 = new Configuration();
config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
MemoryStateBackend backend1 = (MemoryStateBackend)
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
MemoryStateBackend backend2 = (MemoryStateBackend)
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
assertNotNull(backend1);
assertNotNull(backend2);
assertEquals(expectedCheckpointPath, backend1.getCheckpointPath());
assertEquals(expectedCheckpointPath, backend2.getCheckpointPath());
assertEquals(expectedSavepointPath, backend1.getSavepointPath());
assertEquals(expectedSavepointPath, backend2.getSavepointPath());
assertEquals(async, backend1.isUsingAsynchronousSnapshots());
assertEquals(async, backend2.isUsingAsynchronousSnapshots());
}
示例5
/**
* Validates loading a memory state backend with additional parameters from the cluster configuration.
*/
@Test
public void testLoadMemoryStateWithParameters() throws Exception {
final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
final Path expectedCheckpointPath = new Path(checkpointDir);
final Path expectedSavepointPath = new Path(savepointDir);
final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
// to guard against config-breaking changes of the name
final Configuration config1 = new Configuration();
config1.setString(backendKey, "jobmanager");
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
final Configuration config2 = new Configuration();
config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
MemoryStateBackend backend1 = (MemoryStateBackend)
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
MemoryStateBackend backend2 = (MemoryStateBackend)
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
assertNotNull(backend1);
assertNotNull(backend2);
assertEquals(expectedCheckpointPath, backend1.getCheckpointPath());
assertEquals(expectedCheckpointPath, backend2.getCheckpointPath());
assertEquals(expectedSavepointPath, backend1.getSavepointPath());
assertEquals(expectedSavepointPath, backend2.getSavepointPath());
assertEquals(async, backend1.isUsingAsynchronousSnapshots());
assertEquals(async, backend2.isUsingAsynchronousSnapshots());
}
示例6
/**
* Validates loading a memory state backend with additional parameters from the cluster configuration.
*/
@Test
public void testLoadMemoryStateWithParameters() throws Exception {
final String checkpointDir = new Path(tmp.newFolder().toURI()).toString();
final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
final Path expectedCheckpointPath = new Path(checkpointDir);
final Path expectedSavepointPath = new Path(savepointDir);
final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
// we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME)
// to guard against config-breaking changes of the name
final Configuration config1 = new Configuration();
config1.setString(backendKey, "jobmanager");
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
final Configuration config2 = new Configuration();
config2.setString(backendKey, MemoryStateBackendFactory.class.getName());
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
MemoryStateBackend backend1 = (MemoryStateBackend)
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
MemoryStateBackend backend2 = (MemoryStateBackend)
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
assertNotNull(backend1);
assertNotNull(backend2);
assertEquals(expectedCheckpointPath, backend1.getCheckpointPath());
assertEquals(expectedCheckpointPath, backend2.getCheckpointPath());
assertEquals(expectedSavepointPath, backend1.getSavepointPath());
assertEquals(expectedSavepointPath, backend2.getSavepointPath());
assertEquals(async, backend1.isUsingAsynchronousSnapshots());
assertEquals(async, backend2.isUsingAsynchronousSnapshots());
}
示例7
/**
* Checks if an application-defined state backend is given, and if not, loads the state
* backend from the configuration, from the parameter 'state.backend', as defined
* in {@link CheckpointingOptions#STATE_BACKEND}. If no state backend is configured, this instantiates the
* default state backend (the {@link MemoryStateBackend}).
*
* <p>If an application-defined state backend is found, and the state backend is a
* {@link ConfigurableStateBackend}, this methods calls {@link ConfigurableStateBackend#configure(Configuration, ClassLoader)}
* on the state backend.
*
* <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on
* how the state backend is loaded from the configuration.
*
* @param config The configuration to load the state backend from
* @param classLoader The class loader that should be used to load the state backend
* @param logger Optionally, a logger to log actions to (may be null)
*
* @return The instantiated state backend.
*
* @throws DynamicCodeLoadingException
* Thrown if a state backend factory is configured and the factory class was not
* found or the factory could not be instantiated
* @throws IllegalConfigurationException
* May be thrown by the StateBackendFactory when creating / configuring the state
* backend in the factory
* @throws IOException
* May be thrown by the StateBackendFactory when instantiating the state backend
*/
public static StateBackend fromApplicationOrConfigOrDefault(
@Nullable StateBackend fromApplication,
Configuration config,
ClassLoader classLoader,
@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
checkNotNull(config, "config");
checkNotNull(classLoader, "classLoader");
final StateBackend backend;
// (1) the application defined state backend has precedence
if (fromApplication != null) {
if (logger != null) {
logger.info("Using application-defined state backend: {}", fromApplication);
}
// see if this is supposed to pick up additional configuration parameters
if (fromApplication instanceof ConfigurableStateBackend) {
// needs to pick up configuration
if (logger != null) {
logger.info("Configuring application-defined state backend with job/cluster config");
}
backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
}
else {
// keep as is!
backend = fromApplication;
}
}
else {
// (2) check if the config defines a state backend
final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
if (fromConfig != null) {
backend = fromConfig;
}
else {
// (3) use the default
backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
if (logger != null) {
logger.info("No state backend has been configured, using default (Memory / JobManager) {}", backend);
}
}
}
return backend;
}
示例8
/**
* Checks if an application-defined state backend is given, and if not, loads the state
* backend from the configuration, from the parameter 'state.backend', as defined
* in {@link CheckpointingOptions#STATE_BACKEND}. If no state backend is configured, this instantiates the
* default state backend (the {@link MemoryStateBackend}).
*
* <p>If an application-defined state backend is found, and the state backend is a
* {@link ConfigurableStateBackend}, this methods calls {@link ConfigurableStateBackend#configure(Configuration, ClassLoader)}
* on the state backend.
*
* <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on
* how the state backend is loaded from the configuration.
*
* @param config The configuration to load the state backend from
* @param classLoader The class loader that should be used to load the state backend
* @param logger Optionally, a logger to log actions to (may be null)
*
* @return The instantiated state backend.
*
* @throws DynamicCodeLoadingException
* Thrown if a state backend factory is configured and the factory class was not
* found or the factory could not be instantiated
* @throws IllegalConfigurationException
* May be thrown by the StateBackendFactory when creating / configuring the state
* backend in the factory
* @throws IOException
* May be thrown by the StateBackendFactory when instantiating the state backend
*/
public static StateBackend fromApplicationOrConfigOrDefault(
@Nullable StateBackend fromApplication,
Configuration config,
ClassLoader classLoader,
@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
checkNotNull(config, "config");
checkNotNull(classLoader, "classLoader");
final StateBackend backend;
// (1) the application defined state backend has precedence
if (fromApplication != null) {
if (logger != null) {
logger.info("Using application-defined state backend: {}", fromApplication);
}
// see if this is supposed to pick up additional configuration parameters
if (fromApplication instanceof ConfigurableStateBackend) {
// needs to pick up configuration
if (logger != null) {
logger.info("Configuring application-defined state backend with job/cluster config");
}
backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
}
else {
// keep as is!
backend = fromApplication;
}
}
else {
// (2) check if the config defines a state backend
final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
if (fromConfig != null) {
backend = fromConfig;
}
else {
// (3) use the default
backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
if (logger != null) {
logger.info("No state backend has been configured, using default (Memory / JobManager) {}", backend);
}
}
}
return backend;
}
示例9
/**
* Checks if an application-defined state backend is given, and if not, loads the state
* backend from the configuration, from the parameter 'state.backend', as defined
* in {@link CheckpointingOptions#STATE_BACKEND}. If no state backend is configured, this instantiates the
* default state backend (the {@link MemoryStateBackend}).
*
* <p>If an application-defined state backend is found, and the state backend is a
* {@link ConfigurableStateBackend}, this methods calls {@link ConfigurableStateBackend#configure(ReadableConfig, ClassLoader)}
* on the state backend.
*
* <p>Refer to {@link #loadStateBackendFromConfig(ReadableConfig, ClassLoader, Logger)} for details on
* how the state backend is loaded from the configuration.
*
* @param config The configuration to load the state backend from
* @param classLoader The class loader that should be used to load the state backend
* @param logger Optionally, a logger to log actions to (may be null)
*
* @return The instantiated state backend.
*
* @throws DynamicCodeLoadingException
* Thrown if a state backend factory is configured and the factory class was not
* found or the factory could not be instantiated
* @throws IllegalConfigurationException
* May be thrown by the StateBackendFactory when creating / configuring the state
* backend in the factory
* @throws IOException
* May be thrown by the StateBackendFactory when instantiating the state backend
*/
public static StateBackend fromApplicationOrConfigOrDefault(
@Nullable StateBackend fromApplication,
Configuration config,
ClassLoader classLoader,
@Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
checkNotNull(config, "config");
checkNotNull(classLoader, "classLoader");
final StateBackend backend;
// (1) the application defined state backend has precedence
if (fromApplication != null) {
// see if this is supposed to pick up additional configuration parameters
if (fromApplication instanceof ConfigurableStateBackend) {
// needs to pick up configuration
if (logger != null) {
logger.info("Using job/cluster config to configure application-defined state backend: {}", fromApplication);
}
backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
}
else {
// keep as is!
backend = fromApplication;
}
if (logger != null) {
logger.info("Using application-defined state backend: {}", backend);
}
}
else {
// (2) check if the config defines a state backend
final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
if (fromConfig != null) {
backend = fromConfig;
}
else {
// (3) use the default
backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
if (logger != null) {
logger.info("No state backend has been configured, using default (Memory / JobManager) {}", backend);
}
}
}
return backend;
}