Java源码示例:org.apache.flink.table.catalog.GenericInMemoryCatalog
示例1
public static void main(String[] args) {
StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
blinkStreamEnv.setParallelism(1);
EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(blinkStreamEnv, blinkStreamSettings);
blinkStreamTableEnv.registerCatalog("zhisheng", new GenericInMemoryCatalog("zhisheng"));
//GenericInMemoryCatalog,默认的 catalog
//HiveCatalog,这个需要添加 Hive connector 和 Hive 的依赖
// blinkStreamTableEnv.registerCatalog("zhisheng", new HiveCatalog("zhisheng", "zhisheng", "~/zhisheng/hive/conf", "2.3.4"));
}
示例2
public static TableEnvironmentImpl create(EnvironmentSettings settings) {
CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
.create(executorProperties);
TableConfig tableConfig = new TableConfig();
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new TableEnvironmentImpl(
catalogManager,
tableConfig,
executor,
functionCatalog,
planner,
settings.isStreamingMode()
);
}
示例3
public static void main(String[] args) {
StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
blinkStreamEnv.setParallelism(1);
EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(blinkStreamEnv, blinkStreamSettings);
blinkStreamTableEnv.registerCatalog("zhisheng", new GenericInMemoryCatalog("zhisheng"));
//GenericInMemoryCatalog,默认的 catalog
//HiveCatalog,这个需要添加 Hive connector 和 Hive 的依赖
// blinkStreamTableEnv.registerCatalog("zhisheng", new HiveCatalog("zhisheng", "zhisheng", "~/zhisheng/hive/conf", "2.3.4"));
}
示例4
@Test
public void testResolvingSchemaOfCustomCatalogTableTableApi() throws Exception {
TableTestUtil testUtil = getTestUtil();
TableEnvironment tableEnvironment = testUtil.getTableEnv();
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
genericInMemoryCatalog.createTable(
new ObjectPath("default", "testTable"),
new CustomCatalogTable(isStreamingMode),
false);
tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);
Table table = tableEnvironment.from("testCatalog.`default`.testTable")
.window(Tumble.over(lit(10).minute()).on($("rowtime")).as("w"))
.groupBy($("w"))
.select(lit(1).count());
testUtil.verifyPlan(table);
}
示例5
@Test
public void testResolvingProctimeOfCustomTableSql() throws Exception {
if (!isStreamingMode) {
// proctime not supported in batch
return;
}
TableTestUtil testUtil = getTestUtil();
TableEnvironment tableEnvironment = testUtil.getTableEnv();
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
genericInMemoryCatalog.createTable(
new ObjectPath("default", "testTable"),
new CustomCatalogTable(isStreamingMode),
false);
tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);
testUtil.verifyPlan("SELECT COUNT(*) FROM testCatalog.`default`.testTable " +
"GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTE)");
}
示例6
@Test
public void testResolvingProctimeOfCustomTableTableApi() throws Exception {
if (!isStreamingMode) {
// proctime not supported in batch
return;
}
TableTestUtil testUtil = getTestUtil();
TableEnvironment tableEnvironment = testUtil.getTableEnv();
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
genericInMemoryCatalog.createTable(
new ObjectPath("default", "testTable"),
new CustomCatalogTable(isStreamingMode),
false);
tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);
Table table = tableEnvironment.from("testCatalog.`default`.testTable")
.window(Tumble.over(lit(10).minute()).on($("proctime")).as("w"))
.groupBy($("w"))
.select(lit(1).count());
testUtil.verifyPlan(table);
}
示例7
@Test
public void testAlterTableAddPkConstraintEnforced() throws Exception {
Catalog catalog = new GenericInMemoryCatalog("default", "default");
catalogManager.registerCatalog("cat1", catalog);
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
CatalogTable catalogTable = new CatalogTableImpl(
TableSchema.builder()
.field("a", DataTypes.STRING().notNull())
.field("b", DataTypes.BIGINT().notNull())
.field("c", DataTypes.BIGINT())
.build(),
new HashMap<>(),
"tb1");
catalogManager.setCurrentCatalog("cat1");
catalogManager.setCurrentDatabase("db1");
catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
// Test alter table add enforced
thrown.expect(ValidationException.class);
thrown.expectMessage("Flink doesn't support ENFORCED mode for PRIMARY KEY constaint. "
+ "ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the "
+ "incoming/outgoing data. Flink does not own the data therefore the "
+ "only supported mode is the NOT ENFORCED mode");
parse("alter table tb1 add constraint ct1 primary key(a, b)",
SqlDialect.DEFAULT);
}
示例8
@Test
public void testAlterTableAddUniqueConstraint() throws Exception {
Catalog catalog = new GenericInMemoryCatalog("default", "default");
catalogManager.registerCatalog("cat1", catalog);
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
CatalogTable catalogTable = new CatalogTableImpl(
TableSchema.builder()
.field("a", DataTypes.STRING().notNull())
.field("b", DataTypes.BIGINT().notNull())
.build(),
new HashMap<>(),
"tb1");
catalogManager.setCurrentCatalog("cat1");
catalogManager.setCurrentDatabase("db1");
catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
// Test alter add table constraint.
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage("UNIQUE constraint is not supported yet");
parse("alter table tb1 add constraint ct1 unique(a, b) not enforced",
SqlDialect.DEFAULT);
}
示例9
@Test
public void testAlterTableAddUniqueConstraintEnforced() throws Exception {
Catalog catalog = new GenericInMemoryCatalog("default", "default");
catalogManager.registerCatalog("cat1", catalog);
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
CatalogTable catalogTable = new CatalogTableImpl(
TableSchema.builder()
.field("a", DataTypes.STRING().notNull())
.field("b", DataTypes.BIGINT().notNull())
.field("c", DataTypes.BIGINT())
.build(),
new HashMap<>(),
"tb1");
catalogManager.setCurrentCatalog("cat1");
catalogManager.setCurrentDatabase("db1");
catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
// Test alter table add enforced
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage("UNIQUE constraint is not supported yet");
parse("alter table tb1 add constraint ct1 unique(a, b)",
SqlDialect.DEFAULT);
}
示例10
private static CatalogManager createCatalogManager() {
return new CatalogManager(
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
new GenericInMemoryCatalog(
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
EnvironmentSettings.DEFAULT_BUILTIN_DATABASE));
}
示例11
public static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings,
TableConfig tableConfig) {
if (!settings.isStreamingMode()) {
throw new TableException(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
}
CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new StreamTableEnvironmentImpl(
catalogManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
settings.isStreamingMode()
);
}
示例12
private StreamTableEnvironmentImpl getStreamTableEnvironment(
StreamExecutionEnvironment env,
DataStreamSource<Integer> elements) {
CatalogManager catalogManager = new CatalogManager("cat", new GenericInMemoryCatalog("cat", "db"));
return new StreamTableEnvironmentImpl(
catalogManager,
new FunctionCatalog(catalogManager),
new TableConfig(),
env,
new TestPlanner(elements.getTransformation()),
executor,
true
);
}
示例13
private static TableEnvironment createStreamTableEnvironment(
StreamExecutionEnvironment env,
EnvironmentSettings settings,
Executor executor) {
final TableConfig config = TableConfig.getDefault();
final CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
final Map<String, String> plannerProperties = settings.toPlannerProperties();
final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(plannerProperties, executor, config, functionCatalog, catalogManager);
return new StreamTableEnvironmentImpl(
catalogManager,
functionCatalog,
config,
env,
planner,
executor,
settings.isStreamingMode()
);
}
示例14
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
final Optional<String> defaultDatabase = params.getOptionalString(CATALOG_DEFAULT_DATABASE);
return new TestCatalog(name, defaultDatabase.orElse(GenericInMemoryCatalog.DEFAULT_DB));
}
示例15
@Override
public Object createCatalogManager(Object config) {
return CatalogManager.newBuilder()
.classLoader(Thread.currentThread().getContextClassLoader())
.config((ReadableConfig) config)
.defaultCatalog("default_catalog",
new GenericInMemoryCatalog("default_catalog", "default_database"))
.build();
}
示例16
public static CatalogManager.Builder preparedCatalogManager() {
return CatalogManager.newBuilder()
.classLoader(CatalogManagerMocks.class.getClassLoader())
.config(new Configuration())
.defaultCatalog(DEFAULT_CATALOG,
new GenericInMemoryCatalog(DEFAULT_CATALOG, DEFAULT_DATABASE))
.executionConfig(new ExecutionConfig());
}
示例17
/**
* Returns a {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works
* with {@link DataSet}s.
*
* <p>A TableEnvironment can be used to:
* <ul>
* <li>convert a {@link DataSet} to a {@link Table}</li>
* <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog</li>
* <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
* <li>scan a registered table to obtain a {@link Table}</li>
* <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
* <li>convert a {@link Table} into a {@link DataSet}</li>
* <li>explain the AST and execution plan of a {@link Table}</li>
* </ul>
*
* @param executionEnvironment The Java batch {@link ExecutionEnvironment} of the TableEnvironment.
* @param tableConfig The configuration of the TableEnvironment.
*/
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
try {
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager();
String defaultCatalog = "default_catalog";
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
defaultCatalog,
new GenericInMemoryCatalog(defaultCatalog, "default_database"))
.executionConfig(executionEnvironment.getConfig())
.build();
Class<?> clazz = Class.forName("org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl");
Constructor<?> con = clazz.getConstructor(
ExecutionEnvironment.class,
TableConfig.class,
CatalogManager.class,
ModuleManager.class);
return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager, moduleManager);
} catch (Throwable t) {
throw new TableException("Create BatchTableEnvironment failed.", t);
}
}
示例18
@Test
public void testResolvingSchemaOfCustomCatalogTableSql() throws Exception {
TableTestUtil testUtil = getTestUtil();
TableEnvironment tableEnvironment = testUtil.getTableEnv();
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
genericInMemoryCatalog.createTable(
new ObjectPath("default", "testTable"),
new CustomCatalogTable(isStreamingMode),
false);
tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);
tableEnvironment.executeSql("CREATE VIEW testTable2 AS SELECT * FROM testCatalog.`default`.testTable");
testUtil.verifyPlan(
"SELECT COUNT(*) FROM testTable2 GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)");
}
示例19
@Test
public void testCreateCatalog() {
String name = "c1";
TableEnvironment tableEnv = getTableEnvironment();
String ddl = String.format("create catalog %s with('type'='%s')", name, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY);
tableEnv.executeSql(ddl);
assertTrue(tableEnv.getCatalog(name).isPresent());
assertTrue(tableEnv.getCatalog(name).get() instanceof GenericInMemoryCatalog);
}
示例20
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
final Optional<String> defaultDatabase = params.getOptionalString(CATALOG_DEFAULT_DATABASE);
return new TestCatalog(name, defaultDatabase.orElse(GenericInMemoryCatalog.DEFAULT_DB));
}
示例21
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
return new GenericInMemoryCatalog(name);
}
示例22
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
String database = properties.getOrDefault(
CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE,
"default_database");
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog(name, database);
String tableName = properties.getOrDefault(TEST_TABLE_NAME, TEST_TABLE_NAME);
StreamTableSource<Row> tableSource = new StreamTableSource<Row>() {
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.fromCollection(TABLE_CONTENTS)
.returns(new RowTypeInfo(
new TypeInformation[]{Types.INT(), Types.STRING()},
new String[]{"id", "string"}));
}
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.field("id", DataTypes.INT())
.field("string", DataTypes.STRING())
.build();
}
@Override
public DataType getProducedDataType() {
return DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("string", DataTypes.STRING())
);
}
};
try {
genericInMemoryCatalog.createTable(
new ObjectPath(database, tableName),
ConnectorCatalogTable.source(tableSource, false),
false
);
} catch (Exception e) {
throw new WrappingRuntimeException(e);
}
return genericInMemoryCatalog;
}
示例23
@Override
public Object createCatalogManager(Object config) {
return new CatalogManager("default_catalog",
new GenericInMemoryCatalog("default_catalog", "default_database"));
}
示例24
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
return new GenericInMemoryCatalog(name);
}
示例25
public static TableEnvironmentImpl create(EnvironmentSettings settings) {
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
TableConfig tableConfig = new TableConfig();
ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
.create(executorProperties);
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager);
return new TableEnvironmentImpl(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
settings.isStreamingMode()
);
}
示例26
public static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings,
TableConfig tableConfig) {
if (!settings.isStreamingMode()) {
throw new TableException(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
}
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.executionConfig(executionEnvironment.getConfig())
.build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
settings.isStreamingMode()
);
}
示例27
private void initializeTableEnvironment(@Nullable SessionState sessionState) {
final EnvironmentSettings settings = environment.getExecution().getEnvironmentSettings();
final boolean noInheritedState = sessionState == null;
// Step 0.0 Initialize the table configuration.
final TableConfig config = createTableConfig();
if (noInheritedState) {
//--------------------------------------------------------------------------------------------------------------
// Step.1 Create environments
//--------------------------------------------------------------------------------------------------------------
// Step 1.0 Initialize the ModuleManager if required.
final ModuleManager moduleManager = new ModuleManager();
// Step 1.1 Initialize the CatalogManager if required.
final CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(config.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.build();
// Step 1.2 Initialize the FunctionCatalog if required.
final FunctionCatalog functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager);
// Step 1.3 Set up session state.
this.sessionState = SessionState.of(catalogManager, moduleManager, functionCatalog);
// Must initialize the table environment before actually the
createTableEnvironment(settings, config, catalogManager, moduleManager, functionCatalog);
//--------------------------------------------------------------------------------------------------------------
// Step.2 Create modules and load them into the TableEnvironment.
//--------------------------------------------------------------------------------------------------------------
// No need to register the modules info if already inherit from the same session.
Map<String, Module> modules = new LinkedHashMap<>();
environment.getModules().forEach((name, entry) ->
modules.put(name, createModule(entry.asMap(), classLoader))
);
if (!modules.isEmpty()) {
// unload core module first to respect whatever users configure
tableEnv.unloadModule(CoreModuleDescriptorValidator.MODULE_TYPE_CORE);
modules.forEach(tableEnv::loadModule);
}
//--------------------------------------------------------------------------------------------------------------
// Step.3 create user-defined functions and temporal tables then register them.
//--------------------------------------------------------------------------------------------------------------
// No need to register the functions if already inherit from the same session.
registerFunctions();
//--------------------------------------------------------------------------------------------------------------
// Step.4 Create catalogs and register them.
//--------------------------------------------------------------------------------------------------------------
// No need to register the catalogs if already inherit from the same session.
initializeCatalogs();
} else {
// Set up session state.
this.sessionState = sessionState;
createTableEnvironment(
settings,
config,
sessionState.catalogManager,
sessionState.moduleManager,
sessionState.functionCatalog);
}
}
示例28
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
String database = properties.getOrDefault(
CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE,
"default_database");
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog(name, database);
String tableName = properties.getOrDefault(TEST_TABLE_NAME, TEST_TABLE_NAME);
StreamTableSource<Row> tableSource = new StreamTableSource<Row>() {
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.fromCollection(TABLE_CONTENTS)
.returns(new RowTypeInfo(
new TypeInformation[]{Types.INT(), Types.STRING()},
new String[]{"id", "string"}));
}
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.field("id", DataTypes.INT())
.field("string", DataTypes.STRING())
.build();
}
@Override
public DataType getProducedDataType() {
return DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("string", DataTypes.STRING())
);
}
};
try {
genericInMemoryCatalog.createTable(
new ObjectPath(database, tableName),
ConnectorCatalogTable.source(tableSource, false),
false
);
} catch (Exception e) {
throw new WrappingRuntimeException(e);
}
return genericInMemoryCatalog;
}
示例29
public static void main(String[] args) {
//流作业
StreamTableEnvironment sEnv = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment());
Catalog catalog = new GenericInMemoryCatalog("zhisheng");
sEnv.registerCatalog("InMemCatalog", catalog);
//批作业
BatchTableEnvironment bEnv = BatchTableEnvironment.create(ExecutionEnvironment.getExecutionEnvironment());
}
示例30
/**
* Returns a {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works
* with {@link DataSet}s.
*
* <p>A TableEnvironment can be used to:
* <ul>
* <li>convert a {@link DataSet} to a {@link Table}</li>
* <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog</li>
* <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
* <li>scan a registered table to obtain a {@link Table}</li>
* <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
* <li>convert a {@link Table} into a {@link DataSet}</li>
* <li>explain the AST and execution plan of a {@link Table}</li>
* </ul>
*
* @param executionEnvironment The Java batch {@link ExecutionEnvironment} of the TableEnvironment.
* @param tableConfig The configuration of the TableEnvironment.
*/
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
try {
Class<?> clazz = Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class);
String defaultCatalog = "default_catalog";
CatalogManager catalogManager = new CatalogManager(
defaultCatalog,
new GenericInMemoryCatalog(defaultCatalog, "default_database")
);
return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager);
} catch (Throwable t) {
throw new TableException("Create BatchTableEnvironment failed.", t);
}
}