Java源码示例:org.apache.flink.table.factories.StreamTableSinkFactory
示例1
/**
* For sink, stream name information is mandatory.
*/
@Test (expected = IllegalStateException.class)
public void testMissingStreamNameForWriter() {
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("name");
final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
.withFormat(JSON)
.withSchema(SCHEMA)
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
fail("stream name validation failed");
}
示例2
/**
* For sink, routingkey-field-name information is mandatory.
*/
@Test (expected = ValidationException.class)
public void testMissingRoutingKeyForWriter() {
Pravega pravega = new Pravega();
Stream stream = Stream.of(SCOPE, STREAM);
pravega.tableSinkWriterBuilder()
.forStream(stream)
.withPravegaConfig(PRAVEGA_CONFIG);
final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
.withFormat(JSON)
.withSchema(SCHEMA)
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
fail("routingKey field name validation failed");
}
示例3
@Test (expected = ValidationException.class)
public void testInvalidWriterMode() {
Pravega pravega = new Pravega();
Stream stream = Stream.of(SCOPE, STREAM);
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("name")
.forStream(stream)
.withPravegaConfig(PRAVEGA_CONFIG);
final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
.withFormat(JSON)
.withSchema(SCHEMA)
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
Map<String, String> test = new HashMap<>(propertiesMap);
test.put(CONNECTOR_WRITER_MODE, "foo");
TableFactoryService.find(StreamTableSinkFactory.class, test)
.createStreamTableSink(test);
fail("writer mode validation failed");
}
示例4
@Test
public void testValidWriterModeAtleastOnce() {
Pravega pravega = new Pravega();
Stream stream = Stream.of(SCOPE, STREAM);
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("name").withWriterMode(PravegaWriterMode.ATLEAST_ONCE)
.forStream(stream)
.withPravegaConfig(PRAVEGA_CONFIG);
final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
.withFormat(JSON)
.withSchema(SCHEMA)
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertNotNull(sink);
}
示例5
@Test
public void testValidWriterModeExactlyOnce() {
Pravega pravega = new Pravega();
Stream stream = Stream.of(SCOPE, STREAM);
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("name").withWriterMode(PravegaWriterMode.EXACTLY_ONCE)
.forStream(stream)
.withPravegaConfig(PRAVEGA_CONFIG);
final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
.withFormat(JSON)
.withSchema(SCHEMA)
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertNotNull(sink);
}
示例6
@Test (expected = ValidationException.class)
public void testMissingFormatDefinition() {
Pravega pravega = new Pravega();
Stream stream = Stream.of(SCOPE, STREAM);
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("name")
.forStream(stream)
.withPravegaConfig(PRAVEGA_CONFIG);
final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
.withSchema(SCHEMA)
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
fail("table factory validation failed");
}
示例7
@Test (expected = ValidationException.class)
public void testMissingSchemaDefinition() {
Pravega pravega = new Pravega();
Stream stream = Stream.of(SCOPE, STREAM);
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("name")
.forStream(stream)
.withPravegaConfig(PRAVEGA_CONFIG);
final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
.withFormat(JSON)
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
fail("missing schema validation failed");
}
示例8
@Test
public void testTableSink() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions(),
IndexGeneratorFactory.createIndexGenerator(INDEX, schema));
// construct table sink using descriptors and table sink factory
final Map<String, String> elasticSearchProperties = createElasticSearchProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, elasticSearchProperties)
.createStreamTableSink(elasticSearchProperties);
assertEquals(expectedSink, actualSink);
}
示例9
@Test
public void testTableSinkWithLegacyProperties() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions(),
IndexGeneratorFactory.createIndexGenerator(INDEX, schema));
// construct table sink using descriptors and table sink factory
final Map<String, String> elasticSearchProperties = createElasticSearchProperties();
final Map<String, String> legacyPropertiesMap = new HashMap<>();
legacyPropertiesMap.putAll(elasticSearchProperties);
// use legacy properties
legacyPropertiesMap.remove("connector.hosts");
legacyPropertiesMap.put("connector.hosts.0.hostname", "host1");
legacyPropertiesMap.put("connector.hosts.0.port", "1234");
legacyPropertiesMap.put("connector.hosts.0.protocol", "https");
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
.createStreamTableSink(legacyPropertiesMap);
assertEquals(expectedSink, actualSink);
}
示例10
/**
* This test can be unified with the corresponding source test once we have fixed FLINK-9870.
*/
@Test
public void testTableSink() {
// prepare parameters for Kafka table sink
final TableSchema schema = TableSchema.builder()
.field(FRUIT_NAME, DataTypes.STRING())
.field(COUNT, DataTypes.DECIMAL(10, 4))
.field(EVENT_TIME, DataTypes.TIMESTAMP(3))
.build();
final KafkaTableSinkBase expected = getExpectedKafkaTableSink(
schema,
TOPIC,
KAFKA_PROPERTIES,
Optional.of(new FlinkFixedPartitioner<>()),
new TestSerializationSchema(schema.toRowType()));
// construct table sink using descriptors and table sink factory
final Map<String, String> propertiesMap = createKafkaSinkProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertEquals(expected, actualSink);
// test Kafka producer
final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink;
final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
actualKafkaSink.consumeDataStream(streamMock);
assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}
示例11
@Test
public void testTableSink() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
// construct table sink using descriptors and table sink factory
final TestTableDescriptor testDesc = new TestTableDescriptor(
new Elasticsearch()
.version(getElasticsearchVersion())
.host(HOSTNAME, PORT, SCHEMA)
.index(INDEX)
.documentType(DOC_TYPE)
.keyDelimiter(KEY_DELIMITER)
.keyNullLiteral(KEY_NULL_LITERAL)
.bulkFlushBackoffExponential()
.bulkFlushBackoffDelay(123L)
.bulkFlushBackoffMaxRetries(3)
.bulkFlushInterval(100L)
.bulkFlushMaxActions(1000)
.bulkFlushMaxSize("1 MB")
.failureHandlerCustom(DummyFailureHandler.class)
.connectionMaxRetryTimeout(100)
.connectionPathPrefix("/myapp"))
.withFormat(
new Json()
.deriveSchema())
.withSchema(
new Schema()
.field(FIELD_KEY, Types.LONG())
.field(FIELD_FRUIT_NAME, Types.STRING())
.field(FIELD_COUNT, Types.DECIMAL())
.field(FIELD_TS, Types.SQL_TIMESTAMP()))
.inUpsertMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertEquals(expectedSink, actualSink);
}
示例12
/**
* This test can be unified with the corresponding source test once we have fixed FLINK-9870.
*/
@Test
public void testTableSink() {
// prepare parameters for Kafka table sink
final TableSchema schema = TableSchema.builder()
.field(FRUIT_NAME, Types.STRING())
.field(COUNT, Types.DECIMAL())
.field(EVENT_TIME, Types.SQL_TIMESTAMP())
.build();
final KafkaTableSinkBase expected = getExpectedKafkaTableSink(
schema,
TOPIC,
KAFKA_PROPERTIES,
Optional.of(new FlinkFixedPartitioner<>()),
new TestSerializationSchema(schema.toRowType()));
// construct table sink using descriptors and table sink factory
final TestTableDescriptor testDesc = new TestTableDescriptor(
new Kafka()
.version(getKafkaVersion())
.topic(TOPIC)
.properties(KAFKA_PROPERTIES)
.sinkPartitionerFixed()
.startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed
.withFormat(new TestTableFormat())
.withSchema(
new Schema()
.field(FRUIT_NAME, Types.STRING())
.field(COUNT, Types.DECIMAL())
.field(EVENT_TIME, Types.SQL_TIMESTAMP()))
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertEquals(expected, actualSink);
// test Kafka producer
final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink;
final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
actualKafkaSink.emitDataStream(streamMock);
assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}
示例13
@Test
public void testTableSink() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
// construct table sink using descriptors and table sink factory
final TestTableDescriptor testDesc = new TestTableDescriptor(
new Elasticsearch()
.version(getElasticsearchVersion())
.host(HOSTNAME, PORT, SCHEMA)
.index(INDEX)
.documentType(DOC_TYPE)
.keyDelimiter(KEY_DELIMITER)
.keyNullLiteral(KEY_NULL_LITERAL)
.bulkFlushBackoffExponential()
.bulkFlushBackoffDelay(123L)
.bulkFlushBackoffMaxRetries(3)
.bulkFlushInterval(100L)
.bulkFlushMaxActions(1000)
.bulkFlushMaxSize("1 MB")
.failureHandlerCustom(DummyFailureHandler.class)
.connectionMaxRetryTimeout(100)
.connectionPathPrefix("/myapp"))
.withFormat(
new Json()
.deriveSchema())
.withSchema(
new Schema()
.field(FIELD_KEY, Types.LONG())
.field(FIELD_FRUIT_NAME, Types.STRING())
.field(FIELD_COUNT, Types.DECIMAL())
.field(FIELD_TS, Types.SQL_TIMESTAMP()))
.inUpsertMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertEquals(expectedSink, actualSink);
}
示例14
/**
* This test can be unified with the corresponding source test once we have fixed FLINK-9870.
*/
@Test
public void testTableSink() {
// prepare parameters for Kafka table sink
final TableSchema schema = TableSchema.builder()
.field(FRUIT_NAME, Types.STRING())
.field(COUNT, Types.DECIMAL())
.field(EVENT_TIME, Types.SQL_TIMESTAMP())
.build();
final KafkaTableSinkBase expected = getExpectedKafkaTableSink(
schema,
TOPIC,
KAFKA_PROPERTIES,
Optional.of(new FlinkFixedPartitioner<>()),
new TestSerializationSchema(schema.toRowType()));
// construct table sink using descriptors and table sink factory
final TestTableDescriptor testDesc = new TestTableDescriptor(
new Kafka()
.version(getKafkaVersion())
.topic(TOPIC)
.properties(KAFKA_PROPERTIES)
.sinkPartitionerFixed()
.startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed
.withFormat(new TestTableFormat())
.withSchema(
new Schema()
.field(FRUIT_NAME, Types.STRING())
.field(COUNT, Types.DECIMAL())
.field(EVENT_TIME, Types.SQL_TIMESTAMP()))
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertEquals(expected, actualSink);
// test Kafka producer
final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink;
final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
actualKafkaSink.emitDataStream(streamMock);
assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}
示例15
/**
* Validates the use of Pravega Table Descriptor to generate the source/sink Table factory to
* write and read from Pravega stream using {@link StreamTableEnvironment}
* @throws Exception
*/
@Test
public void testStreamingTableUsingDescriptor() throws Exception {
final String scope = setupUtils.getScope();
final String streamName = "stream";
Stream stream = Stream.of(scope, streamName);
this.setupUtils.createTestStream(stream.getStreamName(), 1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
// watermark is only supported in blink planner
.useBlinkPlanner()
.inStreamingMode()
.build());
PravegaConfig pravegaConfig = setupUtils.getPravegaConfig();
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("category")
.forStream(stream)
.withPravegaConfig(pravegaConfig);
pravega.tableSourceReaderBuilder()
.withReaderGroupScope(stream.getScope())
.forStream(stream)
.withPravegaConfig(pravegaConfig);
TableSchema tableSchema = TableSchema.builder()
.field("category", DataTypes.STRING())
.field("value", DataTypes.INT())
.build();
Schema schema = new Schema().schema(tableSchema);
ConnectTableDescriptor desc = tableEnv.connect(pravega)
.withFormat(
new Json()
.failOnMissingField(false)
)
.withSchema(schema)
.inAppendMode();
desc.createTemporaryTable("test");
final Map<String, String> propertiesMap = desc.toProperties();
final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
final TableSource<?> source = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
.createStreamTableSource(propertiesMap);
Table table = tableEnv.fromDataStream(env.fromCollection(SAMPLES));
String tablePathSink = tableEnv.getCurrentDatabase() + "." + "PravegaSink";
ConnectorCatalogTable<?, ?> connectorCatalogSinkTable = ConnectorCatalogTable.sink(sink, false);
tableEnv.getCatalog(tableEnv.getCurrentCatalog())
.get()
.createTable(
ObjectPath.fromString(tablePathSink),
connectorCatalogSinkTable, false);
table.insertInto("PravegaSink");
ConnectorCatalogTable<?, ?> connectorCatalogSourceTable = ConnectorCatalogTable.source(source, false);
String tablePathSource = tableEnv.getCurrentDatabase() + "." + "samples";
tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get().createTable(
ObjectPath.fromString(tablePathSource),
connectorCatalogSourceTable, false);
// select some sample data from the Pravega-backed table, as a view
Table view = tableEnv.sqlQuery("SELECT * FROM samples WHERE category IN ('A','B')");
// write the view to a test sink that verifies the data for test purposes
tableEnv.toAppendStream(view, SampleRecord.class).addSink(new TestSink(SAMPLES));
// execute the topology
try {
env.execute();
Assert.fail("expected an exception");
} catch (Exception e) {
// we expect the job to fail because the test sink throws a deliberate exception.
Assert.assertTrue(ExceptionUtils.getRootCause(e) instanceof TestCompletionException);
}
}
示例16
@Test
public void testStreamTableSinkUsingDescriptor() throws Exception {
// create a Pravega stream for test purposes
Stream stream = Stream.of(setupUtils.getScope(), "testStreamTableSinkUsingDescriptor");
this.setupUtils.createTestStream(stream.getStreamName(), 1);
// create a Flink Table environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
// watermark is only supported in blink planner
.useBlinkPlanner()
.inStreamingMode()
.build());
Table table = tableEnv.fromDataStream(env.fromCollection(SAMPLES));
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("category")
.forStream(stream)
.withPravegaConfig(setupUtils.getPravegaConfig());
ConnectTableDescriptor desc = tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true))
.withSchema(new Schema().
field("category", DataTypes.STRING())
.field("value", DataTypes.INT()))
.inAppendMode();
desc.createTemporaryTable("test");
final Map<String, String> propertiesMap = desc.toProperties();
final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
String tablePath = tableEnv.getCurrentDatabase() + "." + "PravegaSink";
ConnectorCatalogTable<?, ?> connectorCatalogTable = ConnectorCatalogTable.sink(sink, false);
tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get().createTable(
ObjectPath.fromString(tablePath),
connectorCatalogTable, false);
table.insertInto("PravegaSink");
env.execute();
}
示例17
@Test
public void testStreamTableSinkUsingDescriptorWithWatermark() throws Exception {
// create a Pravega stream for test purposes
Stream stream = Stream.of(setupUtils.getScope(), "testStreamTableSinkUsingDescriptorWithWatermark");
this.setupUtils.createTestStream(stream.getStreamName(), 1);
// create a Flink Table environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
// watermark is only supported in blink planner
.useBlinkPlanner()
.inStreamingMode()
.build());
DataStream<SampleRecordWithTimestamp> dataStream = env.fromCollection(SAMPLES)
.map(SampleRecordWithTimestamp::new)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SampleRecordWithTimestamp>() {
@Override
public long extractAscendingTimestamp(SampleRecordWithTimestamp sampleRecordWithTimestamp) {
return sampleRecordWithTimestamp.getTimestamp();
}
});
Table table = tableEnv.fromDataStream(dataStream, "category, value, UserActionTime.rowtime");
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("category")
.enableWatermark(true)
.forStream(stream)
.withPravegaConfig(setupUtils.getPravegaConfig());
TableSchema tableSchema = TableSchema.builder()
.field("category", DataTypes.STRING())
.field("value", DataTypes.INT())
.field("timestamp", DataTypes.TIMESTAMP(3))
.build();
Schema schema = new Schema().schema(tableSchema);
ConnectTableDescriptor desc = tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true))
.withSchema(schema)
.inAppendMode();
desc.createTemporaryTable("test");
final Map<String, String> propertiesMap = desc.toProperties();
final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
String tablePath = tableEnv.getCurrentDatabase() + "." + "PravegaSink";
ConnectorCatalogTable<?, ?> connectorCatalogTable = ConnectorCatalogTable.sink(sink, false);
tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get().createTable(
ObjectPath.fromString(tablePath),
connectorCatalogTable, false);
table.insertInto(tablePath);
env.execute();
}
示例18
@Test
public void testStreamTableSinkUsingDescriptorForAvro() throws Exception {
// create a Pravega stream for test purposes
Stream stream = Stream.of(setupUtils.getScope(), "testStreamTableSinkUsingDescriptorForAvro");
this.setupUtils.createTestStream(stream.getStreamName(), 1);
// create a Flink Table environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
// watermark is only supported in blink planner
.useBlinkPlanner()
.inStreamingMode()
.build());
Table table = tableEnv.fromDataStream(env.fromCollection(SAMPLES));
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("category")
.forStream(stream)
.withPravegaConfig(setupUtils.getPravegaConfig());
Avro avro = new Avro();
String avroSchema = "{" +
" \"type\": \"record\"," +
" \"name\": \"test\"," +
" \"fields\" : [" +
" {\"name\": \"category\", \"type\": \"string\"}," +
" {\"name\": \"value\", \"type\": \"int\"}" +
" ]" +
"}";
avro.avroSchema(avroSchema);
ConnectTableDescriptor desc = tableEnv.connect(pravega)
.withFormat(avro)
.withSchema(new Schema().field("category", DataTypes.STRING()).
field("value", DataTypes.INT()))
.inAppendMode();
desc.createTemporaryTable("test");
final Map<String, String> propertiesMap = desc.toProperties();
final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
String tablePath = tableEnv.getCurrentDatabase() + "." + "PravegaSink";
ConnectorCatalogTable<?, ?> connectorCatalogTable = ConnectorCatalogTable.sink(sink, false);
tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get().createTable(
ObjectPath.fromString(tablePath),
connectorCatalogTable, false);
table.insertInto("PravegaSink");
env.execute();
}
示例19
@Test
@SuppressWarnings("unchecked")
public void testTableSinkDescriptor() {
final String cityName = "fruitName";
final String total = "count";
final String eventTime = "eventTime";
final String procTime = "procTime";
final String controllerUri = "tcp://localhost:9090";
final long delay = 3000L;
final String streamName = "test";
final String scopeName = "test";
Stream stream = Stream.of(scopeName, streamName);
PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
.withControllerURI(URI.create(controllerUri))
.withDefaultScope(scopeName);
// construct table sink using descriptors and table sink factory
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField(cityName)
.withWriterMode(PravegaWriterMode.EXACTLY_ONCE)
.enableWatermark(true)
.forStream(stream)
.enableMetrics(true)
.withPravegaConfig(pravegaConfig);
final FlinkPravegaTableSourceTest.TestTableDescriptor testDesc = new FlinkPravegaTableSourceTest.TestTableDescriptor(pravega)
.withFormat(new Json().failOnMissingField(false))
.withSchema(
new Schema()
.field(cityName, DataTypes.STRING())
.field(total, DataTypes.BIGINT())
.field(eventTime, DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime()
.timestampsFromField(eventTime)
.watermarksFromStrategy(new BoundedOutOfOrderTimestamps(delay))
)
.field(procTime, DataTypes.TIMESTAMP(3)).proctime()
)
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertNotNull(sink);
}
示例20
@Test
public void testTableSinkWithLegacyProperties() {
// prepare parameters for Kafka table sink
final TableSchema schema = TableSchema.builder()
.field(FRUIT_NAME, DataTypes.STRING())
.field(COUNT, DataTypes.DECIMAL(10, 4))
.field(EVENT_TIME, DataTypes.TIMESTAMP(3))
.build();
final KafkaTableSinkBase expected = getExpectedKafkaTableSink(
schema,
TOPIC,
KAFKA_PROPERTIES,
Optional.of(new FlinkFixedPartitioner<>()),
new TestSerializationSchema(schema.toRowType()));
// construct table sink using descriptors and table sink factory
final Map<String, String> legacyPropertiesMap = new HashMap<>();
legacyPropertiesMap.putAll(createKafkaSinkProperties());
// use legacy properties
legacyPropertiesMap.remove("connector.specific-offsets");
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
legacyPropertiesMap.remove("connector.properties.group.id");
// keep compatible with a specified update-mode
legacyPropertiesMap.put("update-mode", "append");
// legacy properties for specific-offsets and properties
legacyPropertiesMap.put("connector.specific-offsets.0.partition", "0");
legacyPropertiesMap.put("connector.specific-offsets.0.offset", "100");
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
legacyPropertiesMap.put("connector.specific-offsets.1.offset", "123");
legacyPropertiesMap.put("connector.properties.0.key", "bootstrap.servers");
legacyPropertiesMap.put("connector.properties.0.value", "dummy");
legacyPropertiesMap.put("connector.properties.1.key", "group.id");
legacyPropertiesMap.put("connector.properties.1.value", "dummy");
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
.createStreamTableSink(legacyPropertiesMap);
assertEquals(expected, actualSink);
// test Kafka producer
final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink;
final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
actualKafkaSink.consumeDataStream(streamMock);
assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}