Java源码示例:org.apache.flink.table.delegation.ExecutorFactory
示例1
public static TableEnvironmentImpl create(EnvironmentSettings settings) {
CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
.create(executorProperties);
TableConfig tableConfig = new TableConfig();
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new TableEnvironmentImpl(
catalogManager,
tableConfig,
executor,
functionCatalog,
planner,
settings.isStreamingMode()
);
}
示例2
private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(
executorFactory,
executorProperties,
executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
e);
}
}
示例3
private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(
executorFactory,
executorProperties,
executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
e);
}
}
示例4
private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(
executorFactory,
executorProperties,
executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
e);
}
}
示例5
private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(
executorFactory,
executorProperties,
executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
e);
}
}
示例6
private static Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod = executorFactory.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(
executorFactory,
executorProperties,
executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
e);
}
}
示例7
public static TableEnvironmentImpl create(EnvironmentSettings settings) {
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
TableConfig tableConfig = new TableConfig();
ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
.create(executorProperties);
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager);
return new TableEnvironmentImpl(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
settings.isStreamingMode()
);
}