Java源码示例:org.apache.flink.table.sources.StreamTableSource

示例1
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
	final DescriptorProperties descriptorProperties = getValidatedProperties(properties);

	final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
	final DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(properties);
	final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);

	return createKafkaTableSource(
		descriptorProperties.getTableSchema(SCHEMA()),
		SchemaValidator.deriveProctimeAttribute(descriptorProperties),
		SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
		SchemaValidator.deriveFieldMapping(
			descriptorProperties,
			Optional.of(deserializationSchema.getProducedType())),
		topic,
		getKafkaProperties(descriptorProperties),
		deserializationSchema,
		startupOptions.startupMode,
		startupOptions.specificOffsets);
}
 
示例2
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
	final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
	// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
	Configuration hbaseClientConf = HBaseConfiguration.create();
	String hbaseZk = descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
	hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
	descriptorProperties
		.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
		.ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));

	String hTableName = descriptorProperties.getString(CONNECTOR_TABLE_NAME);
	TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
	HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
	return new HBaseTableSource(hbaseClientConf, hTableName, hbaseSchema, null);
}
 
示例3
@Test
public void testGenericTable() throws Exception {
	TableSchema schema = TableSchema.builder()
		.field("name", DataTypes.STRING())
		.field("age", DataTypes.INT())
		.build();

	Map<String, String> properties = new HashMap<>();
	properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
	properties.put("connector", "COLLECTION");

	catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
	ObjectPath path = new ObjectPath("mydb", "mytable");
	CatalogTable table = new CatalogTableImpl(schema, properties, "csv table");
	catalog.createTable(path, table, true);
	Optional<TableFactory> opt = catalog.getTableFactory();
	assertTrue(opt.isPresent());
	HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
	TableSource tableSource = tableFactory.createTableSource(path, table);
	assertTrue(tableSource instanceof StreamTableSource);
	TableSink tableSink = tableFactory.createTableSink(path, table);
	assertTrue(tableSink instanceof StreamTableSink);
}
 
示例4
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
	final DescriptorProperties descriptorProperties = getValidatedProperties(properties);

	final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
	final DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(properties);
	final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);

	return createKafkaTableSource(
		descriptorProperties.getTableSchema(SCHEMA),
		SchemaValidator.deriveProctimeAttribute(descriptorProperties),
		SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
		SchemaValidator.deriveFieldMapping(
			descriptorProperties,
			Optional.of(deserializationSchema.getProducedType())),
		topic,
		getKafkaProperties(descriptorProperties),
		deserializationSchema,
		startupOptions.startupMode,
		startupOptions.specificOffsets);
}
 
示例5
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
	TableSource<?> tableSource;
	Optional<TableFactory> tableFactory = catalog.getTableFactory();
	if (tableFactory.isPresent()) {
		TableFactory tf = tableFactory.get();
		if (tf instanceof TableSourceFactory) {
			tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table);
		} else {
			throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory",
				catalog.getClass()));
		}
	} else {
		tableSource = TableFactoryUtil.findAndCreateTableSource(table);
	}

	if (!(tableSource instanceof StreamTableSource)) {
		throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
	}

	return new TableSourceTable<>(
		tableSource,
		!((StreamTableSource<?>) tableSource).isBounded(),
		FlinkStatistic.UNKNOWN()
	);
}
 
示例6
@Test
public void testJdbcCommonProperties() {
	Map<String, String> properties = getBasicProperties();
	properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver");
	properties.put("connector.username", "user");
	properties.put("connector.password", "pass");

	final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
		.createStreamTableSource(properties);

	final JdbcOptions options = JdbcOptions.builder()
		.setDBUrl("jdbc:derby:memory:mydb")
		.setTableName("mytable")
		.setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
		.setUsername("user")
		.setPassword("pass")
		.build();
	final JdbcTableSource expected = JdbcTableSource.builder()
		.setOptions(options)
		.setSchema(schema)
		.build();

	TableSourceValidation.validateTableSource(expected, schema);
	TableSourceValidation.validateTableSource(actual, schema);
	assertEquals(expected, actual);
}
 
示例7
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
	final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
	// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
	Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
	String hbaseZk = descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
	hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
	descriptorProperties
		.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
		.ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));

	String hTableName = descriptorProperties.getString(CONNECTOR_TABLE_NAME);
	TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(
		descriptorProperties.getTableSchema(SCHEMA));
	HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
	return new HBaseTableSource(hbaseClientConf, hTableName, hbaseSchema, null);
}
 
示例8
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
	final DescriptorProperties descriptorProperties = getValidatedProperties(properties);

	final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
	final DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(properties);
	final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);

	return createKafkaTableSource(
		TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA)),
		SchemaValidator.deriveProctimeAttribute(descriptorProperties),
		SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
		SchemaValidator.deriveFieldMapping(
			descriptorProperties,
			Optional.of(deserializationSchema.getProducedType())),
		topic,
		getKafkaProperties(descriptorProperties),
		deserializationSchema,
		startupOptions.startupMode,
		startupOptions.specificOffsets,
		startupOptions.startupTimestampMillis);
}
 
示例9
private Optional<TableSource<?>> findAndCreateTableSource() {
	Optional<TableSource<?>> tableSource = Optional.empty();
	try {
		if (lookupResult.getTable() instanceof CatalogTable) {
			// Use an empty config for TableSourceFactoryContextImpl since we can't fetch the
			// actual TableConfig here. And currently the empty config do not affect the logic.
			ReadableConfig config = new Configuration();
			TableSourceFactory.Context context =
				new TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) lookupResult.getTable(), config);
			TableSource<?> source = TableFactoryUtil.findAndCreateTableSource(context);
			if (source instanceof StreamTableSource) {
				if (!isStreamingMode && !((StreamTableSource<?>) source).isBounded()) {
					throw new ValidationException("Cannot query on an unbounded source in batch mode, but " +
						tableIdentifier.asSummaryString() + " is unbounded.");
				}
				tableSource = Optional.of(source);
			} else {
				throw new ValidationException("Catalog tables only support " +
					"StreamTableSource and InputFormatTableSource.");
			}
		}
	} catch (Exception e) {
		tableSource = Optional.empty();
	}
	return tableSource;
}
 
示例10
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
	final DescriptorProperties params = new DescriptorProperties(true);
	params.putProperties(properties);
	final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
	final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
	return new TestTableSource(
		params.getTableSchema(SCHEMA()),
		properties.get(testProperty),
		proctime.orElse(null),
		rowtime);
}
 
示例11
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
	final DescriptorProperties descriptorProperties = getValidatedProperties(properties);

	return JDBCTableSource.builder()
		.setOptions(getJDBCOptions(descriptorProperties))
		.setReadOptions(getJDBCReadOptions(descriptorProperties))
		.setLookupOptions(getJDBCLookupOptions(descriptorProperties))
		.setSchema(descriptorProperties.getTableSchema(SCHEMA))
		.build();
}
 
示例12
@Test
public void testJDBCCommonProperties() {
	Map<String, String> properties = getBasicProperties();
	properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver");
	properties.put("connector.username", "user");
	properties.put("connector.password", "pass");

	final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
		.createStreamTableSource(properties);

	final JDBCOptions options = JDBCOptions.builder()
		.setDBUrl("jdbc:derby:memory:mydb")
		.setTableName("mytable")
		.setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
		.setUsername("user")
		.setPassword("pass")
		.build();
	final TableSchema schema = TableSchema.builder()
		.field("aaa", DataTypes.INT())
		.field("bbb", DataTypes.STRING())
		.field("ccc", DataTypes.DOUBLE())
		.build();
	final JDBCTableSource expected = JDBCTableSource.builder()
		.setOptions(options)
		.setSchema(schema)
		.build();

	assertEquals(expected, actual);
}
 
示例13
@Test
public void testJDBCReadProperties() {
	Map<String, String> properties = getBasicProperties();
	properties.put("connector.read.partition.column", "aaa");
	properties.put("connector.read.partition.lower-bound", "-10");
	properties.put("connector.read.partition.upper-bound", "100");
	properties.put("connector.read.partition.num", "10");
	properties.put("connector.read.fetch-size", "20");

	final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
		.createStreamTableSource(properties);

	final JDBCOptions options = JDBCOptions.builder()
		.setDBUrl("jdbc:derby:memory:mydb")
		.setTableName("mytable")
		.build();
	final JDBCReadOptions readOptions = JDBCReadOptions.builder()
		.setPartitionColumnName("aaa")
		.setPartitionLowerBound(-10)
		.setPartitionUpperBound(100)
		.setNumPartitions(10)
		.setFetchSize(20)
		.build();
	final TableSchema schema = TableSchema.builder()
		.field("aaa", DataTypes.INT())
		.field("bbb", DataTypes.STRING())
		.field("ccc", DataTypes.DOUBLE())
		.build();
	final JDBCTableSource expected = JDBCTableSource.builder()
		.setOptions(options)
		.setReadOptions(readOptions)
		.setSchema(schema)
		.build();

	assertEquals(expected, actual);
}
 
示例14
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
	TableSource<?> tableSource;
	Optional<TableFactory> tableFactory = catalog.getTableFactory();
	if (tableFactory.isPresent()) {
		TableFactory tf = tableFactory.get();
		if (tf instanceof TableSourceFactory) {
			tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table);
		} else {
			throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory",
				catalog.getClass()));
		}
	} else {
		tableSource = TableFactoryUtil.findAndCreateTableSource(table);
	}

	if (!(tableSource instanceof StreamTableSource)) {
		throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
	}

	return new TableSourceTable<>(
		tableSource,
		// this means the TableSource extends from StreamTableSource, this is needed for the
		// legacy Planner. Blink Planner should use the information that comes from the TableSource
		// itself to determine if it is a streaming or batch source.
		isStreamingMode,
		FlinkStatistic.UNKNOWN()
	);
}
 
示例15
private Table convertConnectorTable(
		ConnectorCatalogTable<?, ?> table,
		ObjectPath tablePath) throws TableNotExistException {
	if (table.getTableSource().isPresent()) {
		TableSource<?> tableSource = table.getTableSource().get();
		if (!(tableSource instanceof StreamTableSource ||
				tableSource instanceof LookupableTableSource)) {
			throw new TableException(
					"Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
		}
		if (!isStreamingMode && tableSource instanceof StreamTableSource &&
				!((StreamTableSource<?>) tableSource).isBounded()) {
			throw new TableException("Only bounded StreamTableSource can be used in batch mode.");
		}

		TableStats tableStats = TableStats.UNKNOWN;
		// TODO supports stats for partitionable table
		if (!table.isPartitioned()) {
			CatalogTableStatistics tableStatistics = catalog.getTableStatistics(tablePath);
			CatalogColumnStatistics columnStatistics = catalog.getTableColumnStatistics(tablePath);
			tableStats = convertToTableStats(tableStatistics, columnStatistics);
		}
		return new TableSourceTable<>(
				tableSource,
				isStreamingMode,
				FlinkStatistic.builder().tableStats(tableStats).build());
	} else {
		Optional<TableSinkTable> tableSinkTable = table.getTableSink()
			.map(tableSink -> new TableSinkTable<>(
				tableSink,
				FlinkStatistic.UNKNOWN()));
		if (tableSinkTable.isPresent()) {
			return tableSinkTable.get();
		} else {
			throw new TableException("Cannot convert a connector table " +
				"without either source or sink.");
		}
	}
}
 
示例16
public RichTableSourceQueryOperation(
		TableSource<T> tableSource,
		FlinkStatistic statistic) {
	super(tableSource, false);
	Preconditions.checkArgument(tableSource instanceof StreamTableSource,
			"Blink planner should always use StreamTableSource.");
	this.statistic = statistic;
}
 
示例17
@Override
public <U> RelNode visit(TableSourceQueryOperation<U> tableSourceOperation) {
	TableSource<?> tableSource = tableSourceOperation.getTableSource();
	boolean isBatch;
	if (tableSource instanceof LookupableTableSource) {
		isBatch = tableSourceOperation.isBatch();
	} else if (tableSource instanceof StreamTableSource) {
		isBatch = ((StreamTableSource<?>) tableSource).isBounded();
	} else {
		throw new TableException(String.format("%s is not supported.", tableSource.getClass().getSimpleName()));
	}

	FlinkStatistic statistic;
	List<String> names;
	if (tableSourceOperation instanceof RichTableSourceQueryOperation &&
		((RichTableSourceQueryOperation<U>) tableSourceOperation).getQualifiedName() != null) {
		statistic = ((RichTableSourceQueryOperation<U>) tableSourceOperation).getStatistic();
		names = ((RichTableSourceQueryOperation<U>) tableSourceOperation).getQualifiedName();
	} else {
		statistic = FlinkStatistic.UNKNOWN();
		// TableSourceScan requires a unique name of a Table for computing a digest.
		// We are using the identity hash of the TableSource object.
		String refId = "Unregistered_TableSource_" + System.identityHashCode(tableSource);
		names = Collections.singletonList(refId);
	}

	TableSourceTable<?> tableSourceTable = new TableSourceTable<>(tableSource, !isBatch, statistic);
	FlinkRelOptTable table = FlinkRelOptTable.create(
		relBuilder.getRelOptSchema(),
		tableSourceTable.getRowType(relBuilder.getTypeFactory()),
		names,
		tableSourceTable);
	return LogicalTableScan.create(relBuilder.getCluster(), table);
}
 
示例18
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
	final DescriptorProperties params = new DescriptorProperties(true);
	params.putProperties(properties);
	final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
	final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
	return new TestTableSource(
		params.getTableSchema(SCHEMA),
		properties.get(testProperty),
		proctime.orElse(null),
		rowtime);
}
 
示例19
@Override
public StreamTableSource<Row> create(Map<String, String> properties) {
  DescriptorProperties params = new DescriptorProperties(true);
  params.putProperties(properties);
  TableSchema schema = params.getTableSchema(TOPIC_SCHEMA_KEY);
  String topic = params.getString(TOPIC_NAME_KEY);
  Properties conf = new Properties();
  conf.putAll(params.getPrefix(KAFKA_CONFIG_PREFIX));
  return new JsonTableSource(topic, conf, schema);
}
 
示例20
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
	DescriptorProperties descriptorProperties = getValidatedProperties(properties);
	TableSchema schema = TableSchemaUtils.getPhysicalSchema(
		descriptorProperties.getTableSchema(SCHEMA));

	return JdbcTableSource.builder()
		.setOptions(getJdbcOptions(descriptorProperties))
		.setReadOptions(getJdbcReadOptions(descriptorProperties))
		.setLookupOptions(getJdbcLookupOptions(descriptorProperties))
		.setSchema(schema)
		.build();
}
 
示例21
@Test
public void testJdbcReadProperties() {
	Map<String, String> properties = getBasicProperties();
	properties.put("connector.read.query", "SELECT aaa FROM mytable");
	properties.put("connector.read.partition.column", "aaa");
	properties.put("connector.read.partition.lower-bound", "-10");
	properties.put("connector.read.partition.upper-bound", "100");
	properties.put("connector.read.partition.num", "10");
	properties.put("connector.read.fetch-size", "20");

	final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
		.createStreamTableSource(properties);

	final JdbcOptions options = JdbcOptions.builder()
		.setDBUrl("jdbc:derby:memory:mydb")
		.setTableName("mytable")
		.build();
	final JdbcReadOptions readOptions = JdbcReadOptions.builder()
		.setQuery("SELECT aaa FROM mytable")
		.setPartitionColumnName("aaa")
		.setPartitionLowerBound(-10)
		.setPartitionUpperBound(100)
		.setNumPartitions(10)
		.setFetchSize(20)
		.build();
	final JdbcTableSource expected = JdbcTableSource.builder()
		.setOptions(options)
		.setReadOptions(readOptions)
		.setSchema(schema)
		.build();

	assertEquals(expected, actual);
}
 
示例22
@Test
public void testGenericTable() throws Exception {
	TableSchema schema = TableSchema.builder()
		.field("name", DataTypes.STRING())
		.field("age", DataTypes.INT())
		.build();

	Map<String, String> properties = new HashMap<>();
	properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
	properties.put("connector", "COLLECTION");

	catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
	ObjectPath path = new ObjectPath("mydb", "mytable");
	CatalogTable table = new CatalogTableImpl(schema, properties, "csv table");
	catalog.createTable(path, table, true);
	Optional<TableFactory> opt = catalog.getTableFactory();
	assertTrue(opt.isPresent());
	HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
	TableSource tableSource = tableFactory.createTableSource(new TableSourceFactoryContextImpl(
			ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
	assertTrue(tableSource instanceof StreamTableSource);
	TableSink tableSink = tableFactory.createTableSink(new TableSinkFactoryContextImpl(
			ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
			table,
			new Configuration(),
			true));
	assertTrue(tableSink instanceof StreamTableSink);
}
 
示例23
private Table convertCatalogTable(
		ObjectIdentifier identifier,
		CatalogTable table,
		TableSchema resolvedSchema,
		@Nullable TableFactory tableFactory) {
	final TableSource<?> tableSource;
	final TableSourceFactory.Context context = new TableSourceFactoryContextImpl(
			identifier, table, tableConfig.getConfiguration());
	if (tableFactory != null) {
		if (tableFactory instanceof TableSourceFactory) {
			tableSource = ((TableSourceFactory<?>) tableFactory).createTableSource(context);
		} else {
			throw new TableException(
				"Cannot query a sink-only table. TableFactory provided by catalog must implement TableSourceFactory");
		}
	} else {
		tableSource = TableFactoryUtil.findAndCreateTableSource(context);
	}

	if (!(tableSource instanceof StreamTableSource)) {
		throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
	}

	return new TableSourceTable<>(
		resolvedSchema,
		tableSource,
		// this means the TableSource extends from StreamTableSource, this is needed for the
		// legacy Planner. Blink Planner should use the information that comes from the TableSource
		// itself to determine if it is a streaming or batch source.
		isStreamingMode,
		FlinkStatistic.UNKNOWN()
	);
}
 
示例24
private TestTable(
		String fullyQualifiedPath,
		TableSchema tableSchema,
		boolean isTemporary) {
	super(new StreamTableSource<Row>() {
		@Override
		public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
			return null;
		}

		@Override
		public DataType getProducedDataType() {
			return tableSchema.toRowDataType();
		}

		@Override
		public TableSchema getTableSchema() {
			throw new UnsupportedOperationException("Should not be called");
		}

		@Override
		public String explainSource() {
			return String.format("isTemporary=[%s]", isTemporary);
		}
	}, null, tableSchema, false);

	this.fullyQualifiedPath = fullyQualifiedPath;
	this.isTemporary = isTemporary;
}
 
示例25
/**
 * Only create a stream table source.
 */
@Override
default TableSource<T> createTableSource(Map<String, String> properties) {
	StreamTableSource<T> source = createStreamTableSource(properties);
	if (source == null) {
		throw new ValidationException(
				"Please override 'createTableSource(Context)' method.");
	}
	return source;
}
 
示例26
public RichTableSourceQueryOperation(
		ObjectIdentifier identifier,
		TableSource<T> tableSource,
		FlinkStatistic statistic) {
	super(tableSource, false);
	Preconditions.checkArgument(tableSource instanceof StreamTableSource,
			"Blink planner should always use StreamTableSource.");
	this.statistic = statistic;
	this.identifier = identifier;
}
 
示例27
private static FlinkPreparingTableBase convertSourceTable(
		RelOptSchema relOptSchema,
		RelDataType rowType,
		ObjectIdentifier tableIdentifier,
		ConnectorCatalogTable<?, ?> table,
		FlinkStatistic statistic,
		boolean isStreamingMode) {
	TableSource<?> tableSource = table.getTableSource().get();
	if (!(tableSource instanceof StreamTableSource ||
		tableSource instanceof LookupableTableSource)) {
		throw new ValidationException(
			"Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
	}
	if (!isStreamingMode && tableSource instanceof StreamTableSource &&
		!((StreamTableSource<?>) tableSource).isBounded()) {
		throw new ValidationException("Only bounded StreamTableSource can be used in batch mode.");
	}

	return new LegacyTableSourceTable<>(
		relOptSchema,
		tableIdentifier,
		rowType,
		statistic,
		tableSource,
		isStreamingMode,
		table);
}
 
示例28
@Override
public StreamTableSource<Row> createTableSource(TableSourceFactory.Context context) {
	TableSchema schema = context.getTable().getSchema();
	final DescriptorProperties params = new DescriptorProperties(true);
	params.putProperties(context.getTable().toProperties());
	final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
	final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
	return new TestTableSource(
		schema,
		context.getTable().getProperties().get(testProperty),
		proctime.orElse(null),
		rowtime);
}
 
示例29
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
	String database = properties.getOrDefault(
		CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE,
		"default_database");
	GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog(name, database);

	String tableName = properties.getOrDefault(TEST_TABLE_NAME, TEST_TABLE_NAME);
	StreamTableSource<Row> tableSource = new StreamTableSource<Row>() {
		@Override
		public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
			return execEnv.fromCollection(TABLE_CONTENTS)
				.returns(new RowTypeInfo(
					new TypeInformation[]{Types.INT(), Types.STRING()},
					new String[]{"id", "string"}));
		}

		@Override
		public TableSchema getTableSchema() {
			return TableSchema.builder()
				.field("id", DataTypes.INT())
				.field("string", DataTypes.STRING())
				.build();
		}

		@Override
		public DataType getProducedDataType() {
			return DataTypes.ROW(
				DataTypes.FIELD("id", DataTypes.INT()),
				DataTypes.FIELD("string", DataTypes.STRING())
			);
		}
	};

	try {
		genericInMemoryCatalog.createTable(
			new ObjectPath(database, tableName),
			ConnectorCatalogTable.source(tableSource, false),
			false
		);
	} catch (Exception e) {
		throw new WrappingRuntimeException(e);
	}

	return genericInMemoryCatalog;
}
 
示例30
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
    return createFlinkPravegaTableSource(properties);
}