Java源码示例:org.apache.flink.table.factories.StreamTableSinkFactory

示例1
/**
 * For sink, stream name information is mandatory.
 */
@Test (expected = IllegalStateException.class)
public void testMissingStreamNameForWriter() {
    Pravega pravega = new Pravega();

    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("name");

    final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
            .withFormat(JSON)
            .withSchema(SCHEMA)
            .inAppendMode();

    final Map<String, String> propertiesMap = testDesc.toProperties();
    TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);
    fail("stream name validation failed");
}
 
示例2
/**
 * For sink, routingkey-field-name information is mandatory.
 */
@Test (expected = ValidationException.class)
public void testMissingRoutingKeyForWriter() {
    Pravega pravega = new Pravega();
    Stream stream = Stream.of(SCOPE, STREAM);

    pravega.tableSinkWriterBuilder()
            .forStream(stream)
            .withPravegaConfig(PRAVEGA_CONFIG);

    final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
            .withFormat(JSON)
            .withSchema(SCHEMA)
            .inAppendMode();

    final Map<String, String> propertiesMap = testDesc.toProperties();
    TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);
    fail("routingKey field name validation failed");
}
 
示例3
@Test (expected = ValidationException.class)
public void testInvalidWriterMode() {
    Pravega pravega = new Pravega();
    Stream stream = Stream.of(SCOPE, STREAM);

    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("name")
            .forStream(stream)
            .withPravegaConfig(PRAVEGA_CONFIG);

    final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
            .withFormat(JSON)
            .withSchema(SCHEMA)
            .inAppendMode();

    final Map<String, String> propertiesMap = testDesc.toProperties();
    Map<String, String> test = new HashMap<>(propertiesMap);
    test.put(CONNECTOR_WRITER_MODE, "foo");
    TableFactoryService.find(StreamTableSinkFactory.class, test)
            .createStreamTableSink(test);
    fail("writer mode validation failed");
}
 
示例4
@Test
public void testValidWriterModeAtleastOnce() {
    Pravega pravega = new Pravega();
    Stream stream = Stream.of(SCOPE, STREAM);

    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("name").withWriterMode(PravegaWriterMode.ATLEAST_ONCE)
            .forStream(stream)
            .withPravegaConfig(PRAVEGA_CONFIG);

    final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
            .withFormat(JSON)
            .withSchema(SCHEMA)
            .inAppendMode();

    final Map<String, String> propertiesMap = testDesc.toProperties();
    final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);
    assertNotNull(sink);
}
 
示例5
@Test
public void testValidWriterModeExactlyOnce() {
    Pravega pravega = new Pravega();
    Stream stream = Stream.of(SCOPE, STREAM);

    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("name").withWriterMode(PravegaWriterMode.EXACTLY_ONCE)
            .forStream(stream)
            .withPravegaConfig(PRAVEGA_CONFIG);

    final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
            .withFormat(JSON)
            .withSchema(SCHEMA)
            .inAppendMode();

    final Map<String, String> propertiesMap = testDesc.toProperties();
    final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);
    assertNotNull(sink);
}
 
示例6
@Test (expected = ValidationException.class)
public void testMissingFormatDefinition() {
    Pravega pravega = new Pravega();
    Stream stream = Stream.of(SCOPE, STREAM);

    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("name")
            .forStream(stream)
            .withPravegaConfig(PRAVEGA_CONFIG);

    final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
            .withSchema(SCHEMA)
            .inAppendMode();

    final Map<String, String> propertiesMap = testDesc.toProperties();
    TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);
    fail("table factory validation failed");
}
 
示例7
@Test (expected = ValidationException.class)
public void testMissingSchemaDefinition() {
    Pravega pravega = new Pravega();
    Stream stream = Stream.of(SCOPE, STREAM);

    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("name")
            .forStream(stream)
            .withPravegaConfig(PRAVEGA_CONFIG);

    final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
            .withFormat(JSON)
            .inAppendMode();

    final Map<String, String> propertiesMap = testDesc.toProperties();
    TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);
    fail("missing schema validation failed");
}
 
示例8
@Test
public void testTableSink() {
	// prepare parameters for Elasticsearch table sink

	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions(),
		IndexGeneratorFactory.createIndexGenerator(INDEX, schema));

	// construct table sink using descriptors and table sink factory
	final Map<String, String> elasticSearchProperties = createElasticSearchProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, elasticSearchProperties)
		.createStreamTableSink(elasticSearchProperties);

	assertEquals(expectedSink, actualSink);
}
 
示例9
@Test
public void testTableSinkWithLegacyProperties() {
	// prepare parameters for Elasticsearch table sink
	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions(),
		IndexGeneratorFactory.createIndexGenerator(INDEX, schema));

	// construct table sink using descriptors and table sink factory
	final Map<String, String> elasticSearchProperties = createElasticSearchProperties();

	final Map<String, String> legacyPropertiesMap = new HashMap<>();
	legacyPropertiesMap.putAll(elasticSearchProperties);
	// use legacy properties
	legacyPropertiesMap.remove("connector.hosts");

	legacyPropertiesMap.put("connector.hosts.0.hostname", "host1");
	legacyPropertiesMap.put("connector.hosts.0.port", "1234");
	legacyPropertiesMap.put("connector.hosts.0.protocol", "https");

	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
		.createStreamTableSink(legacyPropertiesMap);

	assertEquals(expectedSink, actualSink);
}
 
示例10
/**
 * This test can be unified with the corresponding source test once we have fixed FLINK-9870.
 */
@Test
public void testTableSink() {
	// prepare parameters for Kafka table sink
	final TableSchema schema = TableSchema.builder()
		.field(FRUIT_NAME, DataTypes.STRING())
		.field(COUNT, DataTypes.DECIMAL(10, 4))
		.field(EVENT_TIME, DataTypes.TIMESTAMP(3))
		.build();

	final KafkaTableSinkBase expected = getExpectedKafkaTableSink(
		schema,
		TOPIC,
		KAFKA_PROPERTIES,
		Optional.of(new FlinkFixedPartitioner<>()),
		new TestSerializationSchema(schema.toRowType()));

	// construct table sink using descriptors and table sink factory
	final Map<String, String> propertiesMap = createKafkaSinkProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
		.createStreamTableSink(propertiesMap);

	assertEquals(expected, actualSink);

	// test Kafka producer
	final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink;
	final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
	actualKafkaSink.consumeDataStream(streamMock);
	assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}
 
示例11
@Test
public void testTableSink() {
	// prepare parameters for Elasticsearch table sink

	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		new JsonRowSerializationSchema(schema.toRowType()),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	// construct table sink using descriptors and table sink factory

	final TestTableDescriptor testDesc = new TestTableDescriptor(
			new Elasticsearch()
				.version(getElasticsearchVersion())
				.host(HOSTNAME, PORT, SCHEMA)
				.index(INDEX)
				.documentType(DOC_TYPE)
				.keyDelimiter(KEY_DELIMITER)
				.keyNullLiteral(KEY_NULL_LITERAL)
				.bulkFlushBackoffExponential()
				.bulkFlushBackoffDelay(123L)
				.bulkFlushBackoffMaxRetries(3)
				.bulkFlushInterval(100L)
				.bulkFlushMaxActions(1000)
				.bulkFlushMaxSize("1 MB")
				.failureHandlerCustom(DummyFailureHandler.class)
				.connectionMaxRetryTimeout(100)
				.connectionPathPrefix("/myapp"))
		.withFormat(
			new Json()
				.deriveSchema())
		.withSchema(
			new Schema()
				.field(FIELD_KEY, Types.LONG())
				.field(FIELD_FRUIT_NAME, Types.STRING())
				.field(FIELD_COUNT, Types.DECIMAL())
				.field(FIELD_TS, Types.SQL_TIMESTAMP()))
		.inUpsertMode();

	final Map<String, String> propertiesMap = testDesc.toProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
		.createStreamTableSink(propertiesMap);

	assertEquals(expectedSink, actualSink);
}
 
示例12
/**
 * This test can be unified with the corresponding source test once we have fixed FLINK-9870.
 */
@Test
public void testTableSink() {
	// prepare parameters for Kafka table sink

	final TableSchema schema = TableSchema.builder()
		.field(FRUIT_NAME, Types.STRING())
		.field(COUNT, Types.DECIMAL())
		.field(EVENT_TIME, Types.SQL_TIMESTAMP())
		.build();

	final KafkaTableSinkBase expected = getExpectedKafkaTableSink(
		schema,
		TOPIC,
		KAFKA_PROPERTIES,
		Optional.of(new FlinkFixedPartitioner<>()),
		new TestSerializationSchema(schema.toRowType()));

	// construct table sink using descriptors and table sink factory

	final TestTableDescriptor testDesc = new TestTableDescriptor(
			new Kafka()
				.version(getKafkaVersion())
				.topic(TOPIC)
				.properties(KAFKA_PROPERTIES)
				.sinkPartitionerFixed()
				.startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed
		.withFormat(new TestTableFormat())
		.withSchema(
			new Schema()
				.field(FRUIT_NAME, Types.STRING())
				.field(COUNT, Types.DECIMAL())
				.field(EVENT_TIME, Types.SQL_TIMESTAMP()))
		.inAppendMode();

	final Map<String, String> propertiesMap = testDesc.toProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
		.createStreamTableSink(propertiesMap);

	assertEquals(expected, actualSink);

	// test Kafka producer
	final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink;
	final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
	actualKafkaSink.emitDataStream(streamMock);
	assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}
 
示例13
@Test
public void testTableSink() {
	// prepare parameters for Elasticsearch table sink

	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		new JsonRowSerializationSchema(schema.toRowType()),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	// construct table sink using descriptors and table sink factory

	final TestTableDescriptor testDesc = new TestTableDescriptor(
			new Elasticsearch()
				.version(getElasticsearchVersion())
				.host(HOSTNAME, PORT, SCHEMA)
				.index(INDEX)
				.documentType(DOC_TYPE)
				.keyDelimiter(KEY_DELIMITER)
				.keyNullLiteral(KEY_NULL_LITERAL)
				.bulkFlushBackoffExponential()
				.bulkFlushBackoffDelay(123L)
				.bulkFlushBackoffMaxRetries(3)
				.bulkFlushInterval(100L)
				.bulkFlushMaxActions(1000)
				.bulkFlushMaxSize("1 MB")
				.failureHandlerCustom(DummyFailureHandler.class)
				.connectionMaxRetryTimeout(100)
				.connectionPathPrefix("/myapp"))
		.withFormat(
			new Json()
				.deriveSchema())
		.withSchema(
			new Schema()
				.field(FIELD_KEY, Types.LONG())
				.field(FIELD_FRUIT_NAME, Types.STRING())
				.field(FIELD_COUNT, Types.DECIMAL())
				.field(FIELD_TS, Types.SQL_TIMESTAMP()))
		.inUpsertMode();

	final Map<String, String> propertiesMap = testDesc.toProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
		.createStreamTableSink(propertiesMap);

	assertEquals(expectedSink, actualSink);
}
 
示例14
/**
 * This test can be unified with the corresponding source test once we have fixed FLINK-9870.
 */
@Test
public void testTableSink() {
	// prepare parameters for Kafka table sink

	final TableSchema schema = TableSchema.builder()
		.field(FRUIT_NAME, Types.STRING())
		.field(COUNT, Types.DECIMAL())
		.field(EVENT_TIME, Types.SQL_TIMESTAMP())
		.build();

	final KafkaTableSinkBase expected = getExpectedKafkaTableSink(
		schema,
		TOPIC,
		KAFKA_PROPERTIES,
		Optional.of(new FlinkFixedPartitioner<>()),
		new TestSerializationSchema(schema.toRowType()));

	// construct table sink using descriptors and table sink factory

	final TestTableDescriptor testDesc = new TestTableDescriptor(
			new Kafka()
				.version(getKafkaVersion())
				.topic(TOPIC)
				.properties(KAFKA_PROPERTIES)
				.sinkPartitionerFixed()
				.startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed
		.withFormat(new TestTableFormat())
		.withSchema(
			new Schema()
				.field(FRUIT_NAME, Types.STRING())
				.field(COUNT, Types.DECIMAL())
				.field(EVENT_TIME, Types.SQL_TIMESTAMP()))
		.inAppendMode();

	final Map<String, String> propertiesMap = testDesc.toProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
		.createStreamTableSink(propertiesMap);

	assertEquals(expected, actualSink);

	// test Kafka producer
	final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink;
	final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
	actualKafkaSink.emitDataStream(streamMock);
	assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}
 
示例15
/**
 * Validates the use of Pravega Table Descriptor to generate the source/sink Table factory to
 * write and read from Pravega stream using {@link StreamTableEnvironment}
 * @throws Exception
 */
@Test
public void testStreamingTableUsingDescriptor() throws Exception {

    final String scope = setupUtils.getScope();
    final String streamName = "stream";
    Stream stream = Stream.of(scope, streamName);
    this.setupUtils.createTestStream(stream.getStreamName(), 1);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
            EnvironmentSettings.newInstance()
                    // watermark is only supported in blink planner
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build());

    PravegaConfig pravegaConfig = setupUtils.getPravegaConfig();

    Pravega pravega = new Pravega();
    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("category")
            .forStream(stream)
            .withPravegaConfig(pravegaConfig);
    pravega.tableSourceReaderBuilder()
            .withReaderGroupScope(stream.getScope())
            .forStream(stream)
            .withPravegaConfig(pravegaConfig);

    TableSchema tableSchema = TableSchema.builder()
            .field("category", DataTypes.STRING())
            .field("value", DataTypes.INT())
            .build();

    Schema schema = new Schema().schema(tableSchema);

    ConnectTableDescriptor desc = tableEnv.connect(pravega)
            .withFormat(
                new Json()
                        .failOnMissingField(false)
            )
            .withSchema(schema)
            .inAppendMode();

    desc.createTemporaryTable("test");

    final Map<String, String> propertiesMap = desc.toProperties();
    final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);
    final TableSource<?> source = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
            .createStreamTableSource(propertiesMap);

    Table table = tableEnv.fromDataStream(env.fromCollection(SAMPLES));

    String tablePathSink = tableEnv.getCurrentDatabase() + "." + "PravegaSink";

    ConnectorCatalogTable<?, ?> connectorCatalogSinkTable = ConnectorCatalogTable.sink(sink, false);

    tableEnv.getCatalog(tableEnv.getCurrentCatalog())
            .get()
            .createTable(
            ObjectPath.fromString(tablePathSink),
            connectorCatalogSinkTable, false);

    table.insertInto("PravegaSink");

    ConnectorCatalogTable<?, ?> connectorCatalogSourceTable = ConnectorCatalogTable.source(source, false);
    String tablePathSource = tableEnv.getCurrentDatabase() + "." + "samples";

    tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get().createTable(
            ObjectPath.fromString(tablePathSource),
            connectorCatalogSourceTable, false);
    // select some sample data from the Pravega-backed table, as a view
    Table view = tableEnv.sqlQuery("SELECT * FROM samples WHERE category IN ('A','B')");

    // write the view to a test sink that verifies the data for test purposes
    tableEnv.toAppendStream(view, SampleRecord.class).addSink(new TestSink(SAMPLES));

    // execute the topology
    try {
        env.execute();
        Assert.fail("expected an exception");
    } catch (Exception e) {
        // we expect the job to fail because the test sink throws a deliberate exception.
        Assert.assertTrue(ExceptionUtils.getRootCause(e) instanceof TestCompletionException);
    }
}
 
示例16
@Test
public void testStreamTableSinkUsingDescriptor() throws Exception {

    // create a Pravega stream for test purposes
    Stream stream = Stream.of(setupUtils.getScope(), "testStreamTableSinkUsingDescriptor");
    this.setupUtils.createTestStream(stream.getStreamName(), 1);

    // create a Flink Table environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
            EnvironmentSettings.newInstance()
                    // watermark is only supported in blink planner
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build());

    Table table = tableEnv.fromDataStream(env.fromCollection(SAMPLES));

    Pravega pravega = new Pravega();
    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("category")
            .forStream(stream)
            .withPravegaConfig(setupUtils.getPravegaConfig());

    ConnectTableDescriptor desc = tableEnv.connect(pravega)
            .withFormat(new Json().failOnMissingField(true))
            .withSchema(new Schema().
                    field("category", DataTypes.STRING())
                    .field("value", DataTypes.INT()))
            .inAppendMode();
    desc.createTemporaryTable("test");

    final Map<String, String> propertiesMap = desc.toProperties();
    final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);

    String tablePath = tableEnv.getCurrentDatabase() + "." + "PravegaSink";

    ConnectorCatalogTable<?, ?> connectorCatalogTable = ConnectorCatalogTable.sink(sink, false);

    tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get().createTable(
            ObjectPath.fromString(tablePath),
            connectorCatalogTable, false);

    table.insertInto("PravegaSink");
    env.execute();
}
 
示例17
@Test
public void testStreamTableSinkUsingDescriptorWithWatermark() throws Exception {
    // create a Pravega stream for test purposes
    Stream stream = Stream.of(setupUtils.getScope(), "testStreamTableSinkUsingDescriptorWithWatermark");
    this.setupUtils.createTestStream(stream.getStreamName(), 1);

    // create a Flink Table environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
            EnvironmentSettings.newInstance()
                    // watermark is only supported in blink planner
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build());
    DataStream<SampleRecordWithTimestamp> dataStream = env.fromCollection(SAMPLES)
            .map(SampleRecordWithTimestamp::new)
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SampleRecordWithTimestamp>() {
                @Override
                public long extractAscendingTimestamp(SampleRecordWithTimestamp sampleRecordWithTimestamp) {
                    return sampleRecordWithTimestamp.getTimestamp();
                }
            });

    Table table = tableEnv.fromDataStream(dataStream, "category, value, UserActionTime.rowtime");

    Pravega pravega = new Pravega();
    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("category")
            .enableWatermark(true)
            .forStream(stream)
            .withPravegaConfig(setupUtils.getPravegaConfig());

    TableSchema tableSchema = TableSchema.builder()
            .field("category", DataTypes.STRING())
            .field("value", DataTypes.INT())
            .field("timestamp", DataTypes.TIMESTAMP(3))
            .build();

    Schema schema = new Schema().schema(tableSchema);

    ConnectTableDescriptor desc = tableEnv.connect(pravega)
            .withFormat(new Json().failOnMissingField(true))
            .withSchema(schema)
            .inAppendMode();
    desc.createTemporaryTable("test");

    final Map<String, String> propertiesMap = desc.toProperties();
    final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);

    String tablePath = tableEnv.getCurrentDatabase() + "." + "PravegaSink";

    ConnectorCatalogTable<?, ?> connectorCatalogTable = ConnectorCatalogTable.sink(sink, false);

    tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get().createTable(
            ObjectPath.fromString(tablePath),
            connectorCatalogTable, false);

    table.insertInto(tablePath);
    env.execute();
}
 
示例18
@Test
public void testStreamTableSinkUsingDescriptorForAvro() throws Exception {

    // create a Pravega stream for test purposes
    Stream stream = Stream.of(setupUtils.getScope(), "testStreamTableSinkUsingDescriptorForAvro");
    this.setupUtils.createTestStream(stream.getStreamName(), 1);

    // create a Flink Table environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
            EnvironmentSettings.newInstance()
                    // watermark is only supported in blink planner
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build());

    Table table = tableEnv.fromDataStream(env.fromCollection(SAMPLES));

    Pravega pravega = new Pravega();
    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("category")
            .forStream(stream)
            .withPravegaConfig(setupUtils.getPravegaConfig());

    Avro avro = new Avro();
    String avroSchema =  "{" +
            "  \"type\": \"record\"," +
            "  \"name\": \"test\"," +
            "  \"fields\" : [" +
            "    {\"name\": \"category\", \"type\": \"string\"}," +
            "    {\"name\": \"value\", \"type\": \"int\"}" +
            "  ]" +
            "}";
    avro.avroSchema(avroSchema);

    ConnectTableDescriptor desc = tableEnv.connect(pravega)
            .withFormat(avro)
            .withSchema(new Schema().field("category", DataTypes.STRING()).
                    field("value", DataTypes.INT()))
            .inAppendMode();
    desc.createTemporaryTable("test");

    final Map<String, String> propertiesMap = desc.toProperties();
    final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);

    String tablePath = tableEnv.getCurrentDatabase() + "." + "PravegaSink";

    ConnectorCatalogTable<?, ?> connectorCatalogTable = ConnectorCatalogTable.sink(sink, false);

    tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get().createTable(
            ObjectPath.fromString(tablePath),
            connectorCatalogTable, false);

    table.insertInto("PravegaSink");
    env.execute();
}
 
示例19
@Test
@SuppressWarnings("unchecked")
public void testTableSinkDescriptor() {
    final String cityName = "fruitName";
    final String total = "count";
    final String eventTime = "eventTime";
    final String procTime = "procTime";
    final String controllerUri = "tcp://localhost:9090";
    final long delay = 3000L;
    final String streamName = "test";
    final String scopeName = "test";

    Stream stream = Stream.of(scopeName, streamName);
    PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
            .withControllerURI(URI.create(controllerUri))
            .withDefaultScope(scopeName);

    // construct table sink using descriptors and table sink factory
    Pravega pravega = new Pravega();
    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField(cityName)
            .withWriterMode(PravegaWriterMode.EXACTLY_ONCE)
            .enableWatermark(true)
            .forStream(stream)
            .enableMetrics(true)
            .withPravegaConfig(pravegaConfig);

    final FlinkPravegaTableSourceTest.TestTableDescriptor testDesc = new FlinkPravegaTableSourceTest.TestTableDescriptor(pravega)
            .withFormat(new Json().failOnMissingField(false))
            .withSchema(
                    new Schema()
                            .field(cityName, DataTypes.STRING())
                            .field(total, DataTypes.BIGINT())
                            .field(eventTime, DataTypes.TIMESTAMP(3))
                            .rowtime(new Rowtime()
                                    .timestampsFromField(eventTime)
                                    .watermarksFromStrategy(new BoundedOutOfOrderTimestamps(delay))
                            )
                            .field(procTime, DataTypes.TIMESTAMP(3)).proctime()
            )
            .inAppendMode();

    final Map<String, String> propertiesMap = testDesc.toProperties();
    final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);

    assertNotNull(sink);
}
 
示例20
@Test
public void testTableSinkWithLegacyProperties() {
	// prepare parameters for Kafka table sink
	final TableSchema schema = TableSchema.builder()
		.field(FRUIT_NAME, DataTypes.STRING())
		.field(COUNT, DataTypes.DECIMAL(10, 4))
		.field(EVENT_TIME, DataTypes.TIMESTAMP(3))
		.build();

	final KafkaTableSinkBase expected = getExpectedKafkaTableSink(
		schema,
		TOPIC,
		KAFKA_PROPERTIES,
		Optional.of(new FlinkFixedPartitioner<>()),
		new TestSerializationSchema(schema.toRowType()));

	// construct table sink using descriptors and table sink factory
	final Map<String, String> legacyPropertiesMap = new HashMap<>();
	legacyPropertiesMap.putAll(createKafkaSinkProperties());

	// use legacy properties
	legacyPropertiesMap.remove("connector.specific-offsets");
	legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
	legacyPropertiesMap.remove("connector.properties.group.id");

	// keep compatible with a specified update-mode
	legacyPropertiesMap.put("update-mode", "append");

	// legacy properties for specific-offsets and properties
	legacyPropertiesMap.put("connector.specific-offsets.0.partition", "0");
	legacyPropertiesMap.put("connector.specific-offsets.0.offset", "100");
	legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
	legacyPropertiesMap.put("connector.specific-offsets.1.offset", "123");
	legacyPropertiesMap.put("connector.properties.0.key", "bootstrap.servers");
	legacyPropertiesMap.put("connector.properties.0.value", "dummy");
	legacyPropertiesMap.put("connector.properties.1.key", "group.id");
	legacyPropertiesMap.put("connector.properties.1.value", "dummy");

	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
		.createStreamTableSink(legacyPropertiesMap);

	assertEquals(expected, actualSink);

	// test Kafka producer
	final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink;
	final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
	actualKafkaSink.consumeDataStream(streamMock);
	assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}