Java源码示例:org.apache.flink.table.sources.StreamTableSource
示例1
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
final DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(properties);
final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);
return createKafkaTableSource(
descriptorProperties.getTableSchema(SCHEMA()),
SchemaValidator.deriveProctimeAttribute(descriptorProperties),
SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
SchemaValidator.deriveFieldMapping(
descriptorProperties,
Optional.of(deserializationSchema.getProducedType())),
topic,
getKafkaProperties(descriptorProperties),
deserializationSchema,
startupOptions.startupMode,
startupOptions.specificOffsets);
}
示例2
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
Configuration hbaseClientConf = HBaseConfiguration.create();
String hbaseZk = descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
descriptorProperties
.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
.ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
String hTableName = descriptorProperties.getString(CONNECTOR_TABLE_NAME);
TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
return new HBaseTableSource(hbaseClientConf, hTableName, hbaseSchema, null);
}
示例3
@Test
public void testGenericTable() throws Exception {
TableSchema schema = TableSchema.builder()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();
Map<String, String> properties = new HashMap<>();
properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
properties.put("connector", "COLLECTION");
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
ObjectPath path = new ObjectPath("mydb", "mytable");
CatalogTable table = new CatalogTableImpl(schema, properties, "csv table");
catalog.createTable(path, table, true);
Optional<TableFactory> opt = catalog.getTableFactory();
assertTrue(opt.isPresent());
HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
TableSource tableSource = tableFactory.createTableSource(path, table);
assertTrue(tableSource instanceof StreamTableSource);
TableSink tableSink = tableFactory.createTableSink(path, table);
assertTrue(tableSink instanceof StreamTableSink);
}
示例4
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
final DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(properties);
final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);
return createKafkaTableSource(
descriptorProperties.getTableSchema(SCHEMA),
SchemaValidator.deriveProctimeAttribute(descriptorProperties),
SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
SchemaValidator.deriveFieldMapping(
descriptorProperties,
Optional.of(deserializationSchema.getProducedType())),
topic,
getKafkaProperties(descriptorProperties),
deserializationSchema,
startupOptions.startupMode,
startupOptions.specificOffsets);
}
示例5
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
TableSource<?> tableSource;
Optional<TableFactory> tableFactory = catalog.getTableFactory();
if (tableFactory.isPresent()) {
TableFactory tf = tableFactory.get();
if (tf instanceof TableSourceFactory) {
tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table);
} else {
throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory",
catalog.getClass()));
}
} else {
tableSource = TableFactoryUtil.findAndCreateTableSource(table);
}
if (!(tableSource instanceof StreamTableSource)) {
throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
}
return new TableSourceTable<>(
tableSource,
!((StreamTableSource<?>) tableSource).isBounded(),
FlinkStatistic.UNKNOWN()
);
}
示例6
@Test
public void testJdbcCommonProperties() {
Map<String, String> properties = getBasicProperties();
properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver");
properties.put("connector.username", "user");
properties.put("connector.password", "pass");
final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
.createStreamTableSource(properties);
final JdbcOptions options = JdbcOptions.builder()
.setDBUrl("jdbc:derby:memory:mydb")
.setTableName("mytable")
.setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
.setUsername("user")
.setPassword("pass")
.build();
final JdbcTableSource expected = JdbcTableSource.builder()
.setOptions(options)
.setSchema(schema)
.build();
TableSourceValidation.validateTableSource(expected, schema);
TableSourceValidation.validateTableSource(actual, schema);
assertEquals(expected, actual);
}
示例7
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
String hbaseZk = descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
descriptorProperties
.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
.ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
String hTableName = descriptorProperties.getString(CONNECTOR_TABLE_NAME);
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(
descriptorProperties.getTableSchema(SCHEMA));
HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
return new HBaseTableSource(hbaseClientConf, hTableName, hbaseSchema, null);
}
示例8
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
final String topic = descriptorProperties.getString(CONNECTOR_TOPIC);
final DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(properties);
final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic);
return createKafkaTableSource(
TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA)),
SchemaValidator.deriveProctimeAttribute(descriptorProperties),
SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
SchemaValidator.deriveFieldMapping(
descriptorProperties,
Optional.of(deserializationSchema.getProducedType())),
topic,
getKafkaProperties(descriptorProperties),
deserializationSchema,
startupOptions.startupMode,
startupOptions.specificOffsets,
startupOptions.startupTimestampMillis);
}
示例9
private Optional<TableSource<?>> findAndCreateTableSource() {
Optional<TableSource<?>> tableSource = Optional.empty();
try {
if (lookupResult.getTable() instanceof CatalogTable) {
// Use an empty config for TableSourceFactoryContextImpl since we can't fetch the
// actual TableConfig here. And currently the empty config do not affect the logic.
ReadableConfig config = new Configuration();
TableSourceFactory.Context context =
new TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) lookupResult.getTable(), config);
TableSource<?> source = TableFactoryUtil.findAndCreateTableSource(context);
if (source instanceof StreamTableSource) {
if (!isStreamingMode && !((StreamTableSource<?>) source).isBounded()) {
throw new ValidationException("Cannot query on an unbounded source in batch mode, but " +
tableIdentifier.asSummaryString() + " is unbounded.");
}
tableSource = Optional.of(source);
} else {
throw new ValidationException("Catalog tables only support " +
"StreamTableSource and InputFormatTableSource.");
}
}
} catch (Exception e) {
tableSource = Optional.empty();
}
return tableSource;
}
示例10
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
return new TestTableSource(
params.getTableSchema(SCHEMA()),
properties.get(testProperty),
proctime.orElse(null),
rowtime);
}
示例11
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
return JDBCTableSource.builder()
.setOptions(getJDBCOptions(descriptorProperties))
.setReadOptions(getJDBCReadOptions(descriptorProperties))
.setLookupOptions(getJDBCLookupOptions(descriptorProperties))
.setSchema(descriptorProperties.getTableSchema(SCHEMA))
.build();
}
示例12
@Test
public void testJDBCCommonProperties() {
Map<String, String> properties = getBasicProperties();
properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver");
properties.put("connector.username", "user");
properties.put("connector.password", "pass");
final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
.createStreamTableSource(properties);
final JDBCOptions options = JDBCOptions.builder()
.setDBUrl("jdbc:derby:memory:mydb")
.setTableName("mytable")
.setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
.setUsername("user")
.setPassword("pass")
.build();
final TableSchema schema = TableSchema.builder()
.field("aaa", DataTypes.INT())
.field("bbb", DataTypes.STRING())
.field("ccc", DataTypes.DOUBLE())
.build();
final JDBCTableSource expected = JDBCTableSource.builder()
.setOptions(options)
.setSchema(schema)
.build();
assertEquals(expected, actual);
}
示例13
@Test
public void testJDBCReadProperties() {
Map<String, String> properties = getBasicProperties();
properties.put("connector.read.partition.column", "aaa");
properties.put("connector.read.partition.lower-bound", "-10");
properties.put("connector.read.partition.upper-bound", "100");
properties.put("connector.read.partition.num", "10");
properties.put("connector.read.fetch-size", "20");
final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
.createStreamTableSource(properties);
final JDBCOptions options = JDBCOptions.builder()
.setDBUrl("jdbc:derby:memory:mydb")
.setTableName("mytable")
.build();
final JDBCReadOptions readOptions = JDBCReadOptions.builder()
.setPartitionColumnName("aaa")
.setPartitionLowerBound(-10)
.setPartitionUpperBound(100)
.setNumPartitions(10)
.setFetchSize(20)
.build();
final TableSchema schema = TableSchema.builder()
.field("aaa", DataTypes.INT())
.field("bbb", DataTypes.STRING())
.field("ccc", DataTypes.DOUBLE())
.build();
final JDBCTableSource expected = JDBCTableSource.builder()
.setOptions(options)
.setReadOptions(readOptions)
.setSchema(schema)
.build();
assertEquals(expected, actual);
}
示例14
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
TableSource<?> tableSource;
Optional<TableFactory> tableFactory = catalog.getTableFactory();
if (tableFactory.isPresent()) {
TableFactory tf = tableFactory.get();
if (tf instanceof TableSourceFactory) {
tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table);
} else {
throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory",
catalog.getClass()));
}
} else {
tableSource = TableFactoryUtil.findAndCreateTableSource(table);
}
if (!(tableSource instanceof StreamTableSource)) {
throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
}
return new TableSourceTable<>(
tableSource,
// this means the TableSource extends from StreamTableSource, this is needed for the
// legacy Planner. Blink Planner should use the information that comes from the TableSource
// itself to determine if it is a streaming or batch source.
isStreamingMode,
FlinkStatistic.UNKNOWN()
);
}
示例15
private Table convertConnectorTable(
ConnectorCatalogTable<?, ?> table,
ObjectPath tablePath) throws TableNotExistException {
if (table.getTableSource().isPresent()) {
TableSource<?> tableSource = table.getTableSource().get();
if (!(tableSource instanceof StreamTableSource ||
tableSource instanceof LookupableTableSource)) {
throw new TableException(
"Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
}
if (!isStreamingMode && tableSource instanceof StreamTableSource &&
!((StreamTableSource<?>) tableSource).isBounded()) {
throw new TableException("Only bounded StreamTableSource can be used in batch mode.");
}
TableStats tableStats = TableStats.UNKNOWN;
// TODO supports stats for partitionable table
if (!table.isPartitioned()) {
CatalogTableStatistics tableStatistics = catalog.getTableStatistics(tablePath);
CatalogColumnStatistics columnStatistics = catalog.getTableColumnStatistics(tablePath);
tableStats = convertToTableStats(tableStatistics, columnStatistics);
}
return new TableSourceTable<>(
tableSource,
isStreamingMode,
FlinkStatistic.builder().tableStats(tableStats).build());
} else {
Optional<TableSinkTable> tableSinkTable = table.getTableSink()
.map(tableSink -> new TableSinkTable<>(
tableSink,
FlinkStatistic.UNKNOWN()));
if (tableSinkTable.isPresent()) {
return tableSinkTable.get();
} else {
throw new TableException("Cannot convert a connector table " +
"without either source or sink.");
}
}
}
示例16
public RichTableSourceQueryOperation(
TableSource<T> tableSource,
FlinkStatistic statistic) {
super(tableSource, false);
Preconditions.checkArgument(tableSource instanceof StreamTableSource,
"Blink planner should always use StreamTableSource.");
this.statistic = statistic;
}
示例17
@Override
public <U> RelNode visit(TableSourceQueryOperation<U> tableSourceOperation) {
TableSource<?> tableSource = tableSourceOperation.getTableSource();
boolean isBatch;
if (tableSource instanceof LookupableTableSource) {
isBatch = tableSourceOperation.isBatch();
} else if (tableSource instanceof StreamTableSource) {
isBatch = ((StreamTableSource<?>) tableSource).isBounded();
} else {
throw new TableException(String.format("%s is not supported.", tableSource.getClass().getSimpleName()));
}
FlinkStatistic statistic;
List<String> names;
if (tableSourceOperation instanceof RichTableSourceQueryOperation &&
((RichTableSourceQueryOperation<U>) tableSourceOperation).getQualifiedName() != null) {
statistic = ((RichTableSourceQueryOperation<U>) tableSourceOperation).getStatistic();
names = ((RichTableSourceQueryOperation<U>) tableSourceOperation).getQualifiedName();
} else {
statistic = FlinkStatistic.UNKNOWN();
// TableSourceScan requires a unique name of a Table for computing a digest.
// We are using the identity hash of the TableSource object.
String refId = "Unregistered_TableSource_" + System.identityHashCode(tableSource);
names = Collections.singletonList(refId);
}
TableSourceTable<?> tableSourceTable = new TableSourceTable<>(tableSource, !isBatch, statistic);
FlinkRelOptTable table = FlinkRelOptTable.create(
relBuilder.getRelOptSchema(),
tableSourceTable.getRowType(relBuilder.getTypeFactory()),
names,
tableSourceTable);
return LogicalTableScan.create(relBuilder.getCluster(), table);
}
示例18
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
return new TestTableSource(
params.getTableSchema(SCHEMA),
properties.get(testProperty),
proctime.orElse(null),
rowtime);
}
示例19
@Override
public StreamTableSource<Row> create(Map<String, String> properties) {
DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
TableSchema schema = params.getTableSchema(TOPIC_SCHEMA_KEY);
String topic = params.getString(TOPIC_NAME_KEY);
Properties conf = new Properties();
conf.putAll(params.getPrefix(KAFKA_CONFIG_PREFIX));
return new JsonTableSource(topic, conf, schema);
}
示例20
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
DescriptorProperties descriptorProperties = getValidatedProperties(properties);
TableSchema schema = TableSchemaUtils.getPhysicalSchema(
descriptorProperties.getTableSchema(SCHEMA));
return JdbcTableSource.builder()
.setOptions(getJdbcOptions(descriptorProperties))
.setReadOptions(getJdbcReadOptions(descriptorProperties))
.setLookupOptions(getJdbcLookupOptions(descriptorProperties))
.setSchema(schema)
.build();
}
示例21
@Test
public void testJdbcReadProperties() {
Map<String, String> properties = getBasicProperties();
properties.put("connector.read.query", "SELECT aaa FROM mytable");
properties.put("connector.read.partition.column", "aaa");
properties.put("connector.read.partition.lower-bound", "-10");
properties.put("connector.read.partition.upper-bound", "100");
properties.put("connector.read.partition.num", "10");
properties.put("connector.read.fetch-size", "20");
final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
.createStreamTableSource(properties);
final JdbcOptions options = JdbcOptions.builder()
.setDBUrl("jdbc:derby:memory:mydb")
.setTableName("mytable")
.build();
final JdbcReadOptions readOptions = JdbcReadOptions.builder()
.setQuery("SELECT aaa FROM mytable")
.setPartitionColumnName("aaa")
.setPartitionLowerBound(-10)
.setPartitionUpperBound(100)
.setNumPartitions(10)
.setFetchSize(20)
.build();
final JdbcTableSource expected = JdbcTableSource.builder()
.setOptions(options)
.setReadOptions(readOptions)
.setSchema(schema)
.build();
assertEquals(expected, actual);
}
示例22
@Test
public void testGenericTable() throws Exception {
TableSchema schema = TableSchema.builder()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();
Map<String, String> properties = new HashMap<>();
properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
properties.put("connector", "COLLECTION");
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
ObjectPath path = new ObjectPath("mydb", "mytable");
CatalogTable table = new CatalogTableImpl(schema, properties, "csv table");
catalog.createTable(path, table, true);
Optional<TableFactory> opt = catalog.getTableFactory();
assertTrue(opt.isPresent());
HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
TableSource tableSource = tableFactory.createTableSource(new TableSourceFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
assertTrue(tableSource instanceof StreamTableSource);
TableSink tableSink = tableFactory.createTableSink(new TableSinkFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
table,
new Configuration(),
true));
assertTrue(tableSink instanceof StreamTableSink);
}
示例23
private Table convertCatalogTable(
ObjectIdentifier identifier,
CatalogTable table,
TableSchema resolvedSchema,
@Nullable TableFactory tableFactory) {
final TableSource<?> tableSource;
final TableSourceFactory.Context context = new TableSourceFactoryContextImpl(
identifier, table, tableConfig.getConfiguration());
if (tableFactory != null) {
if (tableFactory instanceof TableSourceFactory) {
tableSource = ((TableSourceFactory<?>) tableFactory).createTableSource(context);
} else {
throw new TableException(
"Cannot query a sink-only table. TableFactory provided by catalog must implement TableSourceFactory");
}
} else {
tableSource = TableFactoryUtil.findAndCreateTableSource(context);
}
if (!(tableSource instanceof StreamTableSource)) {
throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
}
return new TableSourceTable<>(
resolvedSchema,
tableSource,
// this means the TableSource extends from StreamTableSource, this is needed for the
// legacy Planner. Blink Planner should use the information that comes from the TableSource
// itself to determine if it is a streaming or batch source.
isStreamingMode,
FlinkStatistic.UNKNOWN()
);
}
示例24
private TestTable(
String fullyQualifiedPath,
TableSchema tableSchema,
boolean isTemporary) {
super(new StreamTableSource<Row>() {
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return null;
}
@Override
public DataType getProducedDataType() {
return tableSchema.toRowDataType();
}
@Override
public TableSchema getTableSchema() {
throw new UnsupportedOperationException("Should not be called");
}
@Override
public String explainSource() {
return String.format("isTemporary=[%s]", isTemporary);
}
}, null, tableSchema, false);
this.fullyQualifiedPath = fullyQualifiedPath;
this.isTemporary = isTemporary;
}
示例25
/**
* Only create a stream table source.
*/
@Override
default TableSource<T> createTableSource(Map<String, String> properties) {
StreamTableSource<T> source = createStreamTableSource(properties);
if (source == null) {
throw new ValidationException(
"Please override 'createTableSource(Context)' method.");
}
return source;
}
示例26
public RichTableSourceQueryOperation(
ObjectIdentifier identifier,
TableSource<T> tableSource,
FlinkStatistic statistic) {
super(tableSource, false);
Preconditions.checkArgument(tableSource instanceof StreamTableSource,
"Blink planner should always use StreamTableSource.");
this.statistic = statistic;
this.identifier = identifier;
}
示例27
private static FlinkPreparingTableBase convertSourceTable(
RelOptSchema relOptSchema,
RelDataType rowType,
ObjectIdentifier tableIdentifier,
ConnectorCatalogTable<?, ?> table,
FlinkStatistic statistic,
boolean isStreamingMode) {
TableSource<?> tableSource = table.getTableSource().get();
if (!(tableSource instanceof StreamTableSource ||
tableSource instanceof LookupableTableSource)) {
throw new ValidationException(
"Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
}
if (!isStreamingMode && tableSource instanceof StreamTableSource &&
!((StreamTableSource<?>) tableSource).isBounded()) {
throw new ValidationException("Only bounded StreamTableSource can be used in batch mode.");
}
return new LegacyTableSourceTable<>(
relOptSchema,
tableIdentifier,
rowType,
statistic,
tableSource,
isStreamingMode,
table);
}
示例28
@Override
public StreamTableSource<Row> createTableSource(TableSourceFactory.Context context) {
TableSchema schema = context.getTable().getSchema();
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(context.getTable().toProperties());
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
return new TestTableSource(
schema,
context.getTable().getProperties().get(testProperty),
proctime.orElse(null),
rowtime);
}
示例29
@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
String database = properties.getOrDefault(
CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE,
"default_database");
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog(name, database);
String tableName = properties.getOrDefault(TEST_TABLE_NAME, TEST_TABLE_NAME);
StreamTableSource<Row> tableSource = new StreamTableSource<Row>() {
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.fromCollection(TABLE_CONTENTS)
.returns(new RowTypeInfo(
new TypeInformation[]{Types.INT(), Types.STRING()},
new String[]{"id", "string"}));
}
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.field("id", DataTypes.INT())
.field("string", DataTypes.STRING())
.build();
}
@Override
public DataType getProducedDataType() {
return DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("string", DataTypes.STRING())
);
}
};
try {
genericInMemoryCatalog.createTable(
new ObjectPath(database, tableName),
ConnectorCatalogTable.source(tableSource, false),
false
);
} catch (Exception e) {
throw new WrappingRuntimeException(e);
}
return genericInMemoryCatalog;
}
示例30
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
return createFlinkPravegaTableSource(properties);
}