Java源码示例:org.apache.flink.table.catalog.GenericInMemoryCatalog

示例1
public static void main(String[] args) {
        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        blinkStreamEnv.setParallelism(1);
        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(blinkStreamEnv, blinkStreamSettings);

        blinkStreamTableEnv.registerCatalog("zhisheng", new GenericInMemoryCatalog("zhisheng"));
        //GenericInMemoryCatalog,默认的 catalog


        //HiveCatalog,这个需要添加 Hive connector 和 Hive 的依赖
//        blinkStreamTableEnv.registerCatalog("zhisheng", new HiveCatalog("zhisheng", "zhisheng", "~/zhisheng/hive/conf", "2.3.4"));


    }
 
示例2
public static TableEnvironmentImpl create(EnvironmentSettings settings) {

		CatalogManager catalogManager = new CatalogManager(
			settings.getBuiltInCatalogName(),
			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));

		FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);

		Map<String, String> executorProperties = settings.toExecutorProperties();
		Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
			.create(executorProperties);

		TableConfig tableConfig = new TableConfig();
		Map<String, String> plannerProperties = settings.toPlannerProperties();
		Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
			.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);

		return new TableEnvironmentImpl(
			catalogManager,
			tableConfig,
			executor,
			functionCatalog,
			planner,
			settings.isStreamingMode()
		);
	}
 
示例3
public static void main(String[] args) {
        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        blinkStreamEnv.setParallelism(1);
        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(blinkStreamEnv, blinkStreamSettings);

        blinkStreamTableEnv.registerCatalog("zhisheng", new GenericInMemoryCatalog("zhisheng"));
        //GenericInMemoryCatalog,默认的 catalog


        //HiveCatalog,这个需要添加 Hive connector 和 Hive 的依赖
//        blinkStreamTableEnv.registerCatalog("zhisheng", new HiveCatalog("zhisheng", "zhisheng", "~/zhisheng/hive/conf", "2.3.4"));


    }
 
示例4
@Test
public void testResolvingSchemaOfCustomCatalogTableTableApi() throws Exception {
	TableTestUtil testUtil = getTestUtil();
	TableEnvironment tableEnvironment = testUtil.getTableEnv();
	GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
	genericInMemoryCatalog.createTable(
		new ObjectPath("default", "testTable"),
		new CustomCatalogTable(isStreamingMode),
		false);
	tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);

	Table table = tableEnvironment.from("testCatalog.`default`.testTable")
		.window(Tumble.over(lit(10).minute()).on($("rowtime")).as("w"))
		.groupBy($("w"))
		.select(lit(1).count());
	testUtil.verifyPlan(table);
}
 
示例5
@Test
public void testResolvingProctimeOfCustomTableSql() throws Exception {
	if (!isStreamingMode) {
		// proctime not supported in batch
		return;
	}
	TableTestUtil testUtil = getTestUtil();
	TableEnvironment tableEnvironment = testUtil.getTableEnv();
	GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
	genericInMemoryCatalog.createTable(
		new ObjectPath("default", "testTable"),
		new CustomCatalogTable(isStreamingMode),
		false);
	tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);

	testUtil.verifyPlan("SELECT COUNT(*) FROM testCatalog.`default`.testTable " +
		"GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTE)");
}
 
示例6
@Test
public void testResolvingProctimeOfCustomTableTableApi() throws Exception {
	if (!isStreamingMode) {
		// proctime not supported in batch
		return;
	}
	TableTestUtil testUtil = getTestUtil();
	TableEnvironment tableEnvironment = testUtil.getTableEnv();
	GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
	genericInMemoryCatalog.createTable(
		new ObjectPath("default", "testTable"),
		new CustomCatalogTable(isStreamingMode),
		false);
	tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);

	Table table = tableEnvironment.from("testCatalog.`default`.testTable")
		.window(Tumble.over(lit(10).minute()).on($("proctime")).as("w"))
		.groupBy($("w"))
		.select(lit(1).count());
	testUtil.verifyPlan(table);
}
 
示例7
@Test
public void testAlterTableAddPkConstraintEnforced() throws Exception {
	Catalog catalog = new GenericInMemoryCatalog("default", "default");
	catalogManager.registerCatalog("cat1", catalog);
	catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
	CatalogTable catalogTable = new CatalogTableImpl(
			TableSchema.builder()
					.field("a", DataTypes.STRING().notNull())
					.field("b", DataTypes.BIGINT().notNull())
					.field("c", DataTypes.BIGINT())
					.build(),
			new HashMap<>(),
			"tb1");
	catalogManager.setCurrentCatalog("cat1");
	catalogManager.setCurrentDatabase("db1");
	catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
	// Test alter table add enforced
	thrown.expect(ValidationException.class);
	thrown.expectMessage("Flink doesn't support ENFORCED mode for PRIMARY KEY constaint. "
			+ "ENFORCED/NOT ENFORCED  controls if the constraint checks are performed on the "
			+ "incoming/outgoing data. Flink does not own the data therefore the "
			+ "only supported mode is the NOT ENFORCED mode");
	parse("alter table tb1 add constraint ct1 primary key(a, b)",
			SqlDialect.DEFAULT);
}
 
示例8
@Test
public void testAlterTableAddUniqueConstraint() throws Exception {
	Catalog catalog = new GenericInMemoryCatalog("default", "default");
	catalogManager.registerCatalog("cat1", catalog);
	catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
	CatalogTable catalogTable = new CatalogTableImpl(
			TableSchema.builder()
					.field("a", DataTypes.STRING().notNull())
					.field("b", DataTypes.BIGINT().notNull())
					.build(),
			new HashMap<>(),
			"tb1");
	catalogManager.setCurrentCatalog("cat1");
	catalogManager.setCurrentDatabase("db1");
	catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
	// Test alter add table constraint.
	thrown.expect(UnsupportedOperationException.class);
	thrown.expectMessage("UNIQUE constraint is not supported yet");
	parse("alter table tb1 add constraint ct1 unique(a, b) not enforced",
			SqlDialect.DEFAULT);
}
 
示例9
@Test
public void testAlterTableAddUniqueConstraintEnforced() throws Exception {
	Catalog catalog = new GenericInMemoryCatalog("default", "default");
	catalogManager.registerCatalog("cat1", catalog);
	catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
	CatalogTable catalogTable = new CatalogTableImpl(
			TableSchema.builder()
					.field("a", DataTypes.STRING().notNull())
					.field("b", DataTypes.BIGINT().notNull())
					.field("c", DataTypes.BIGINT())
					.build(),
			new HashMap<>(),
			"tb1");
	catalogManager.setCurrentCatalog("cat1");
	catalogManager.setCurrentDatabase("db1");
	catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
	// Test alter table add enforced
	thrown.expect(UnsupportedOperationException.class);
	thrown.expectMessage("UNIQUE constraint is not supported yet");
	parse("alter table tb1 add constraint ct1 unique(a, b)",
			SqlDialect.DEFAULT);
}
 
示例10
private static CatalogManager createCatalogManager() {
	return new CatalogManager(
		EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
		new GenericInMemoryCatalog(
			EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
			EnvironmentSettings.DEFAULT_BUILTIN_DATABASE));
}
 
示例11
public static StreamTableEnvironment create(
		StreamExecutionEnvironment executionEnvironment,
		EnvironmentSettings settings,
		TableConfig tableConfig) {

	if (!settings.isStreamingMode()) {
		throw new TableException(
			"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
	}

	CatalogManager catalogManager = new CatalogManager(
		settings.getBuiltInCatalogName(),
		new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));

	FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);

	Map<String, String> executorProperties = settings.toExecutorProperties();
	Executor executor = lookupExecutor(executorProperties, executionEnvironment);

	Map<String, String> plannerProperties = settings.toPlannerProperties();
	Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
		.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);

	return new StreamTableEnvironmentImpl(
		catalogManager,
		functionCatalog,
		tableConfig,
		executionEnvironment,
		planner,
		executor,
		settings.isStreamingMode()
	);
}
 
示例12
private StreamTableEnvironmentImpl getStreamTableEnvironment(
		StreamExecutionEnvironment env,
		DataStreamSource<Integer> elements) {
	CatalogManager catalogManager = new CatalogManager("cat", new GenericInMemoryCatalog("cat", "db"));
	return new StreamTableEnvironmentImpl(
		catalogManager,
		new FunctionCatalog(catalogManager),
		new TableConfig(),
		env,
		new TestPlanner(elements.getTransformation()),
		executor,
		true
	);
}
 
示例13
private static TableEnvironment createStreamTableEnvironment(
		StreamExecutionEnvironment env,
		EnvironmentSettings settings,
		Executor executor) {

	final TableConfig config = TableConfig.getDefault();

	final CatalogManager catalogManager = new CatalogManager(
		settings.getBuiltInCatalogName(),
		new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));

	final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);

	final Map<String, String> plannerProperties = settings.toPlannerProperties();
	final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
		.create(plannerProperties, executor, config, functionCatalog, catalogManager);

	return new StreamTableEnvironmentImpl(
		catalogManager,
		functionCatalog,
		config,
		env,
		planner,
		executor,
		settings.isStreamingMode()
	);
}
 
示例14
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
	final DescriptorProperties params = new DescriptorProperties(true);
	params.putProperties(properties);

	final Optional<String> defaultDatabase = params.getOptionalString(CATALOG_DEFAULT_DATABASE);

	return new TestCatalog(name, defaultDatabase.orElse(GenericInMemoryCatalog.DEFAULT_DB));
}
 
示例15
@Override
public Object createCatalogManager(Object config) {
  return CatalogManager.newBuilder()
          .classLoader(Thread.currentThread().getContextClassLoader())
          .config((ReadableConfig) config)
          .defaultCatalog("default_catalog",
                  new GenericInMemoryCatalog("default_catalog", "default_database"))
          .build();
}
 
示例16
public static CatalogManager.Builder preparedCatalogManager() {
	return CatalogManager.newBuilder()
		.classLoader(CatalogManagerMocks.class.getClassLoader())
		.config(new Configuration())
		.defaultCatalog(DEFAULT_CATALOG,
			new GenericInMemoryCatalog(DEFAULT_CATALOG, DEFAULT_DATABASE))
		.executionConfig(new ExecutionConfig());
}
 
示例17
/**
 * Returns a {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works
 * with {@link DataSet}s.
 *
 * <p>A TableEnvironment can be used to:
 * <ul>
 *     <li>convert a {@link DataSet} to a {@link Table}</li>
 *     <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog</li>
 *     <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
 *     <li>scan a registered table to obtain a {@link Table}</li>
 *     <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
 *     <li>convert a {@link Table} into a {@link DataSet}</li>
 *     <li>explain the AST and execution plan of a {@link Table}</li>
 * </ul>
 *
 * @param executionEnvironment The Java batch {@link ExecutionEnvironment} of the TableEnvironment.
 * @param tableConfig The configuration of the TableEnvironment.
 */
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
	try {
		// temporary solution until FLINK-15635 is fixed
		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

		ModuleManager moduleManager = new ModuleManager();

		String defaultCatalog = "default_catalog";
		CatalogManager catalogManager = CatalogManager.newBuilder()
			.classLoader(classLoader)
			.config(tableConfig.getConfiguration())
			.defaultCatalog(
				defaultCatalog,
				new GenericInMemoryCatalog(defaultCatalog, "default_database"))
			.executionConfig(executionEnvironment.getConfig())
			.build();

		Class<?> clazz = Class.forName("org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl");
		Constructor<?> con = clazz.getConstructor(
			ExecutionEnvironment.class,
			TableConfig.class,
			CatalogManager.class,
			ModuleManager.class);
		return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager, moduleManager);
	} catch (Throwable t) {
		throw new TableException("Create BatchTableEnvironment failed.", t);
	}
}
 
示例18
@Test
public void testResolvingSchemaOfCustomCatalogTableSql() throws Exception {
	TableTestUtil testUtil = getTestUtil();
	TableEnvironment tableEnvironment = testUtil.getTableEnv();
	GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
	genericInMemoryCatalog.createTable(
		new ObjectPath("default", "testTable"),
		new CustomCatalogTable(isStreamingMode),
		false);
	tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);
	tableEnvironment.executeSql("CREATE VIEW testTable2 AS SELECT * FROM testCatalog.`default`.testTable");

	testUtil.verifyPlan(
		"SELECT COUNT(*) FROM testTable2 GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)");
}
 
示例19
@Test
public void testCreateCatalog() {
	String name = "c1";
	TableEnvironment tableEnv = getTableEnvironment();
	String ddl = String.format("create catalog %s with('type'='%s')", name, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY);

	tableEnv.executeSql(ddl);

	assertTrue(tableEnv.getCatalog(name).isPresent());
	assertTrue(tableEnv.getCatalog(name).get() instanceof GenericInMemoryCatalog);
}
 
示例20
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
	final DescriptorProperties params = new DescriptorProperties(true);
	params.putProperties(properties);

	final Optional<String> defaultDatabase = params.getOptionalString(CATALOG_DEFAULT_DATABASE);

	return new TestCatalog(name, defaultDatabase.orElse(GenericInMemoryCatalog.DEFAULT_DB));
}
 
示例21
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
	return new GenericInMemoryCatalog(name);
}
 
示例22
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
	String database = properties.getOrDefault(
		CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE,
		"default_database");
	GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog(name, database);

	String tableName = properties.getOrDefault(TEST_TABLE_NAME, TEST_TABLE_NAME);
	StreamTableSource<Row> tableSource = new StreamTableSource<Row>() {
		@Override
		public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
			return execEnv.fromCollection(TABLE_CONTENTS)
				.returns(new RowTypeInfo(
					new TypeInformation[]{Types.INT(), Types.STRING()},
					new String[]{"id", "string"}));
		}

		@Override
		public TableSchema getTableSchema() {
			return TableSchema.builder()
				.field("id", DataTypes.INT())
				.field("string", DataTypes.STRING())
				.build();
		}

		@Override
		public DataType getProducedDataType() {
			return DataTypes.ROW(
				DataTypes.FIELD("id", DataTypes.INT()),
				DataTypes.FIELD("string", DataTypes.STRING())
			);
		}
	};

	try {
		genericInMemoryCatalog.createTable(
			new ObjectPath(database, tableName),
			ConnectorCatalogTable.source(tableSource, false),
			false
		);
	} catch (Exception e) {
		throw new WrappingRuntimeException(e);
	}

	return genericInMemoryCatalog;
}
 
示例23
@Override
public Object createCatalogManager(Object config) {
  return new CatalogManager("default_catalog",
          new GenericInMemoryCatalog("default_catalog", "default_database"));
}
 
示例24
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
	return new GenericInMemoryCatalog(name);
}
 
示例25
public static TableEnvironmentImpl create(EnvironmentSettings settings) {

		// temporary solution until FLINK-15635 is fixed
		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

		TableConfig tableConfig = new TableConfig();

		ModuleManager moduleManager = new ModuleManager();

		CatalogManager catalogManager = CatalogManager.newBuilder()
			.classLoader(classLoader)
			.config(tableConfig.getConfiguration())
			.defaultCatalog(
				settings.getBuiltInCatalogName(),
				new GenericInMemoryCatalog(
					settings.getBuiltInCatalogName(),
					settings.getBuiltInDatabaseName()))
			.build();

		FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);

		Map<String, String> executorProperties = settings.toExecutorProperties();
		Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
			.create(executorProperties);

		Map<String, String> plannerProperties = settings.toPlannerProperties();
		Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
			.create(
				plannerProperties,
				executor,
				tableConfig,
				functionCatalog,
				catalogManager);

		return new TableEnvironmentImpl(
			catalogManager,
			moduleManager,
			tableConfig,
			executor,
			functionCatalog,
			planner,
			settings.isStreamingMode()
		);
	}
 
示例26
public static StreamTableEnvironment create(
		StreamExecutionEnvironment executionEnvironment,
		EnvironmentSettings settings,
		TableConfig tableConfig) {

	if (!settings.isStreamingMode()) {
		throw new TableException(
			"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
	}

	// temporary solution until FLINK-15635 is fixed
	ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

	ModuleManager moduleManager = new ModuleManager();

	CatalogManager catalogManager = CatalogManager.newBuilder()
		.classLoader(classLoader)
		.config(tableConfig.getConfiguration())
		.defaultCatalog(
			settings.getBuiltInCatalogName(),
			new GenericInMemoryCatalog(
				settings.getBuiltInCatalogName(),
				settings.getBuiltInDatabaseName()))
		.executionConfig(executionEnvironment.getConfig())
		.build();

	FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);

	Map<String, String> executorProperties = settings.toExecutorProperties();
	Executor executor = lookupExecutor(executorProperties, executionEnvironment);

	Map<String, String> plannerProperties = settings.toPlannerProperties();
	Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
		.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);

	return new StreamTableEnvironmentImpl(
		catalogManager,
		moduleManager,
		functionCatalog,
		tableConfig,
		executionEnvironment,
		planner,
		executor,
		settings.isStreamingMode()
	);
}
 
示例27
private void initializeTableEnvironment(@Nullable SessionState sessionState) {
	final EnvironmentSettings settings = environment.getExecution().getEnvironmentSettings();
	final boolean noInheritedState = sessionState == null;
	// Step 0.0 Initialize the table configuration.
	final TableConfig config = createTableConfig();

	if (noInheritedState) {
		//--------------------------------------------------------------------------------------------------------------
		// Step.1 Create environments
		//--------------------------------------------------------------------------------------------------------------

		// Step 1.0 Initialize the ModuleManager if required.
		final ModuleManager moduleManager = new ModuleManager();

		// Step 1.1 Initialize the CatalogManager if required.
		final CatalogManager catalogManager = CatalogManager.newBuilder()
			.classLoader(classLoader)
			.config(config.getConfiguration())
			.defaultCatalog(
				settings.getBuiltInCatalogName(),
				new GenericInMemoryCatalog(
					settings.getBuiltInCatalogName(),
					settings.getBuiltInDatabaseName()))
			.build();

		// Step 1.2 Initialize the FunctionCatalog if required.
		final FunctionCatalog functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager);

		// Step 1.3 Set up session state.
		this.sessionState = SessionState.of(catalogManager, moduleManager, functionCatalog);

		// Must initialize the table environment before actually the
		createTableEnvironment(settings, config, catalogManager, moduleManager, functionCatalog);

		//--------------------------------------------------------------------------------------------------------------
		// Step.2 Create modules and load them into the TableEnvironment.
		//--------------------------------------------------------------------------------------------------------------
		// No need to register the modules info if already inherit from the same session.
		Map<String, Module> modules = new LinkedHashMap<>();
		environment.getModules().forEach((name, entry) ->
				modules.put(name, createModule(entry.asMap(), classLoader))
		);
		if (!modules.isEmpty()) {
			// unload core module first to respect whatever users configure
			tableEnv.unloadModule(CoreModuleDescriptorValidator.MODULE_TYPE_CORE);
			modules.forEach(tableEnv::loadModule);
		}

		//--------------------------------------------------------------------------------------------------------------
		// Step.3 create user-defined functions and temporal tables then register them.
		//--------------------------------------------------------------------------------------------------------------
		// No need to register the functions if already inherit from the same session.
		registerFunctions();

		//--------------------------------------------------------------------------------------------------------------
		// Step.4 Create catalogs and register them.
		//--------------------------------------------------------------------------------------------------------------
		// No need to register the catalogs if already inherit from the same session.
		initializeCatalogs();
	} else {
		// Set up session state.
		this.sessionState = sessionState;
		createTableEnvironment(
				settings,
				config,
				sessionState.catalogManager,
				sessionState.moduleManager,
				sessionState.functionCatalog);
	}
}
 
示例28
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
	String database = properties.getOrDefault(
		CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE,
		"default_database");
	GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog(name, database);

	String tableName = properties.getOrDefault(TEST_TABLE_NAME, TEST_TABLE_NAME);
	StreamTableSource<Row> tableSource = new StreamTableSource<Row>() {
		@Override
		public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
			return execEnv.fromCollection(TABLE_CONTENTS)
				.returns(new RowTypeInfo(
					new TypeInformation[]{Types.INT(), Types.STRING()},
					new String[]{"id", "string"}));
		}

		@Override
		public TableSchema getTableSchema() {
			return TableSchema.builder()
				.field("id", DataTypes.INT())
				.field("string", DataTypes.STRING())
				.build();
		}

		@Override
		public DataType getProducedDataType() {
			return DataTypes.ROW(
				DataTypes.FIELD("id", DataTypes.INT()),
				DataTypes.FIELD("string", DataTypes.STRING())
			);
		}
	};

	try {
		genericInMemoryCatalog.createTable(
			new ObjectPath(database, tableName),
			ConnectorCatalogTable.source(tableSource, false),
			false
		);
	} catch (Exception e) {
		throw new WrappingRuntimeException(e);
	}

	return genericInMemoryCatalog;
}
 
示例29
public static void main(String[] args) {
    //流作业
    StreamTableEnvironment sEnv = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment());

    Catalog catalog = new GenericInMemoryCatalog("zhisheng");
    sEnv.registerCatalog("InMemCatalog", catalog);


    //批作业
    BatchTableEnvironment bEnv = BatchTableEnvironment.create(ExecutionEnvironment.getExecutionEnvironment());

}
 
示例30
/**
 * Returns a {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works
 * with {@link DataSet}s.
 *
 * <p>A TableEnvironment can be used to:
 * <ul>
 *     <li>convert a {@link DataSet} to a {@link Table}</li>
 *     <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog</li>
 *     <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
 *     <li>scan a registered table to obtain a {@link Table}</li>
 *     <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
 *     <li>convert a {@link Table} into a {@link DataSet}</li>
 *     <li>explain the AST and execution plan of a {@link Table}</li>
 * </ul>
 *
 * @param executionEnvironment The Java batch {@link ExecutionEnvironment} of the TableEnvironment.
 * @param tableConfig The configuration of the TableEnvironment.
 */
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
	try {
		Class<?> clazz = Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
		Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class);
		String defaultCatalog = "default_catalog";
		CatalogManager catalogManager = new CatalogManager(
			defaultCatalog,
			new GenericInMemoryCatalog(defaultCatalog, "default_database")
		);
		return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager);
	} catch (Throwable t) {
		throw new TableException("Create BatchTableEnvironment failed.", t);
	}
}