Java源码示例:org.apache.flink.formats.json.JsonRowSerializationSchema

示例1
@Override
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
            "Number of provided field names and types does not match.");
    Elasticsearch6TableSink tableSink = new Elasticsearch6TableSink(this.elasticsearch6Properties);
    if (this.elasticsearch6Properties.getTableSchema() == null) {
        tableSink.schema = new TableSchema(fieldNames, fieldTypes);
    }
    RowTypeInfo rowTypeInfo = new RowTypeInfo(tableSink.schema.getFieldTypes(), tableSink.schema.getFieldNames());
    tableSink.serializationSchema = new JsonRowSerializationSchema(rowTypeInfo);
    return tableSink;
}
 
示例2
public Elasticsearch5TableFunction(String index,
                                   Integer fieldIndex,
                                   String type,
                                   JsonRowSerializationSchema jsonRowSchema) {
    if (type == null) {
        this.type = "*";
    } else {
        this.type = type;
    }

    this.index = index;
    this.schema = jsonRowSchema;
    this.fieldIndex = fieldIndex;
}
 
示例3
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
    Elasticsearch5TableSink copy = new Elasticsearch5TableSink(this.elasticsearch5Properties);
    copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
    copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
    Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
            "Number of provided field names and types does not match.");

    RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
    copy.jsonRowSchema = new JsonRowSerializationSchema(rowSchema);

    return copy;
}
 
示例4
private <T> T newJson(TypeInformation typeInformation, Boolean isDeserialization) {
    if(isDeserialization){
        return (T)new JsonRowDeserializationSchema(typeInformation);
    }else{
        return (T)new JsonRowSerializationSchema(typeInformation);
    }
}
 
示例5
@Override
public <T> T transform(TableSchema param) throws Exception {
    TableSchema tableSchema = createTableSchema();
    if (tableSchema == null) {
        tableSchema = param;
    }
    if (tableSchema == null) {
        throw new IllegalArgumentException("TableSchema must be not null");
    }
    TypeInformation[] fieldTypes = new TypeInformation[tableSchema.getFieldCount()];
    for (int i = 0; i < tableSchema.getFieldCount(); i++) {
        if (FlinkTypeFactory.isTimeIndicatorType(tableSchema.getFieldTypes()[i])) {
            fieldTypes[i] = Types.SQL_TIMESTAMP();
        }else{
            fieldTypes[i] = tableSchema.getFieldTypes()[i];
        }
    }
    TypeInformation typeInformation = new RowTypeInfo(fieldTypes, tableSchema.getFieldNames());
    SerializationSchema<Row> rowSerializationSchema = createSerializationSchema(typeInformation);
    return (T) newTableSink(
        new TableSchema(tableSchema.getFieldNames(), fieldTypes),
        this.topic,
        PropertiesUtil.fromYamlMap(this.getProperties()),
        Optional.empty(),
        rowSerializationSchema == null ? new JsonRowSerializationSchema(typeInformation) : rowSerializationSchema
    );
}
 
示例6
@Test
public void testTableSourceUsingDescriptor() throws Exception {
    StreamExecutionEnvironment execEnvWrite = StreamExecutionEnvironment.getExecutionEnvironment();
    execEnvWrite.setParallelism(1);

    Stream stream = Stream.of(SETUP_UTILS.getScope(), "testJsonTableSource1");
    SETUP_UTILS.createTestStream(stream.getStreamName(), 1);

    // read data from the stream using Table reader
    TableSchema tableSchema = TableSchema.builder()
            .field("user", DataTypes.STRING())
            .field("uri", DataTypes.STRING())
            .field("accessTime", DataTypes.TIMESTAMP(3).bridgedTo(Timestamp.class))
            .build();
    TypeInformation<Row> typeInfo = (RowTypeInfo) TypeConversions.fromDataTypeToLegacyInfo(tableSchema.toRowDataType());

    PravegaConfig pravegaConfig = SETUP_UTILS.getPravegaConfig();

    // Write some data to the stream
    DataStreamSource<Row> dataStream = execEnvWrite
            .addSource(new TableEventSource(EVENT_COUNT_PER_SOURCE));

    FlinkPravegaWriter<Row> pravegaSink = FlinkPravegaWriter.<Row>builder()
            .withPravegaConfig(pravegaConfig)
            .forStream(stream)
            .withSerializationSchema(new JsonRowSerializationSchema.Builder(typeInfo).build())
            .withEventRouter((Row event) -> "fixedkey")
            .build();

    dataStream.addSink(pravegaSink);
    Assert.assertNotNull(execEnvWrite.getExecutionPlan());
    execEnvWrite.execute("PopulateRowData");

    testTableSourceStreamingDescriptor(stream, pravegaConfig);
    testTableSourceBatchDescriptor(stream, pravegaConfig);
}
 
示例7
@Test
public void testTableSink() {
	// prepare parameters for Elasticsearch table sink

	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions(),
		IndexGeneratorFactory.createIndexGenerator(INDEX, schema));

	// construct table sink using descriptors and table sink factory
	final Map<String, String> elasticSearchProperties = createElasticSearchProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, elasticSearchProperties)
		.createStreamTableSink(elasticSearchProperties);

	assertEquals(expectedSink, actualSink);
}
 
示例8
@Test
public void testTableSinkWithLegacyProperties() {
	// prepare parameters for Elasticsearch table sink
	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions(),
		IndexGeneratorFactory.createIndexGenerator(INDEX, schema));

	// construct table sink using descriptors and table sink factory
	final Map<String, String> elasticSearchProperties = createElasticSearchProperties();

	final Map<String, String> legacyPropertiesMap = new HashMap<>();
	legacyPropertiesMap.putAll(elasticSearchProperties);
	// use legacy properties
	legacyPropertiesMap.remove("connector.hosts");

	legacyPropertiesMap.put("connector.hosts.0.hostname", "host1");
	legacyPropertiesMap.put("connector.hosts.0.port", "1234");
	legacyPropertiesMap.put("connector.hosts.0.protocol", "https");

	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
		.createStreamTableSink(legacyPropertiesMap);

	assertEquals(expectedSink, actualSink);
}
 
示例9
@Test
public void testBuilder() {
	final TableSchema schema = createTestSchema();

	final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		new JsonRowSerializationSchema(schema.toRowType()),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	final DataStreamMock dataStreamMock = new DataStreamMock(
			new StreamExecutionEnvironmentMock(),
			Types.TUPLE(Types.BOOLEAN, schema.toRowType()));

	testSink.emitDataStream(dataStreamMock);

	final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
		Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
		new ElasticsearchUpsertSinkFunction(
			INDEX,
			DOC_TYPE,
			KEY_DELIMITER,
			KEY_NULL_LITERAL,
			new JsonRowSerializationSchema(schema.toRowType()),
			XContentType.JSON,
			Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
			new int[0]));
	expectedBuilder.setFailureHandler(new DummyFailureHandler());
	expectedBuilder.setBulkFlushBackoff(true);
	expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
	expectedBuilder.setBulkFlushBackoffDelay(123);
	expectedBuilder.setBulkFlushBackoffRetries(3);
	expectedBuilder.setBulkFlushInterval(100);
	expectedBuilder.setBulkFlushMaxActions(1000);
	expectedBuilder.setBulkFlushMaxSizeMb(1);
	expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));

	assertEquals(expectedBuilder, testSink.builder);
}
 
示例10
@Test
public void testTableSink() {
	// prepare parameters for Elasticsearch table sink

	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		new JsonRowSerializationSchema(schema.toRowType()),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	// construct table sink using descriptors and table sink factory

	final TestTableDescriptor testDesc = new TestTableDescriptor(
			new Elasticsearch()
				.version(getElasticsearchVersion())
				.host(HOSTNAME, PORT, SCHEMA)
				.index(INDEX)
				.documentType(DOC_TYPE)
				.keyDelimiter(KEY_DELIMITER)
				.keyNullLiteral(KEY_NULL_LITERAL)
				.bulkFlushBackoffExponential()
				.bulkFlushBackoffDelay(123L)
				.bulkFlushBackoffMaxRetries(3)
				.bulkFlushInterval(100L)
				.bulkFlushMaxActions(1000)
				.bulkFlushMaxSize("1 MB")
				.failureHandlerCustom(DummyFailureHandler.class)
				.connectionMaxRetryTimeout(100)
				.connectionPathPrefix("/myapp"))
		.withFormat(
			new Json()
				.deriveSchema())
		.withSchema(
			new Schema()
				.field(FIELD_KEY, Types.LONG())
				.field(FIELD_FRUIT_NAME, Types.STRING())
				.field(FIELD_COUNT, Types.DECIMAL())
				.field(FIELD_TS, Types.SQL_TIMESTAMP()))
		.inUpsertMode();

	final Map<String, String> propertiesMap = testDesc.toProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
		.createStreamTableSink(propertiesMap);

	assertEquals(expectedSink, actualSink);
}
 
示例11
@Test
public void testBuilder() {
	final TableSchema schema = createTestSchema();

	final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		new JsonRowSerializationSchema(schema.toRowType()),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	final DataStreamMock dataStreamMock = new DataStreamMock(
			new StreamExecutionEnvironmentMock(),
			Types.TUPLE(Types.BOOLEAN, schema.toRowType()));

	testSink.emitDataStream(dataStreamMock);

	final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
		Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
		new ElasticsearchUpsertSinkFunction(
			INDEX,
			DOC_TYPE,
			KEY_DELIMITER,
			KEY_NULL_LITERAL,
			new JsonRowSerializationSchema(schema.toRowType()),
			XContentType.JSON,
			Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
			new int[0]));
	expectedBuilder.setFailureHandler(new DummyFailureHandler());
	expectedBuilder.setBulkFlushBackoff(true);
	expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
	expectedBuilder.setBulkFlushBackoffDelay(123);
	expectedBuilder.setBulkFlushBackoffRetries(3);
	expectedBuilder.setBulkFlushInterval(100);
	expectedBuilder.setBulkFlushMaxActions(1000);
	expectedBuilder.setBulkFlushMaxSizeMb(1);
	expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));

	assertEquals(expectedBuilder, testSink.builder);
}
 
示例12
@Test
public void testTableSink() {
	// prepare parameters for Elasticsearch table sink

	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		new JsonRowSerializationSchema(schema.toRowType()),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	// construct table sink using descriptors and table sink factory

	final TestTableDescriptor testDesc = new TestTableDescriptor(
			new Elasticsearch()
				.version(getElasticsearchVersion())
				.host(HOSTNAME, PORT, SCHEMA)
				.index(INDEX)
				.documentType(DOC_TYPE)
				.keyDelimiter(KEY_DELIMITER)
				.keyNullLiteral(KEY_NULL_LITERAL)
				.bulkFlushBackoffExponential()
				.bulkFlushBackoffDelay(123L)
				.bulkFlushBackoffMaxRetries(3)
				.bulkFlushInterval(100L)
				.bulkFlushMaxActions(1000)
				.bulkFlushMaxSize("1 MB")
				.failureHandlerCustom(DummyFailureHandler.class)
				.connectionMaxRetryTimeout(100)
				.connectionPathPrefix("/myapp"))
		.withFormat(
			new Json()
				.deriveSchema())
		.withSchema(
			new Schema()
				.field(FIELD_KEY, Types.LONG())
				.field(FIELD_FRUIT_NAME, Types.STRING())
				.field(FIELD_COUNT, Types.DECIMAL())
				.field(FIELD_TS, Types.SQL_TIMESTAMP()))
		.inUpsertMode();

	final Map<String, String> propertiesMap = testDesc.toProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
		.createStreamTableSink(propertiesMap);

	assertEquals(expectedSink, actualSink);
}
 
示例13
@Override
protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
    return new JsonRowSerializationSchema(rowSchema);
}
 
示例14
@Test
public void testBuilder() {
	final TableSchema schema = createTestSchema();
	final IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator(INDEX, schema);

	final TestElasticsearch7UpsertTableSink testSink = new TestElasticsearch7UpsertTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	final DataStreamMock dataStreamMock = new DataStreamMock(
			new StreamExecutionEnvironmentMock(),
			Types.TUPLE(Types.BOOLEAN, schema.toRowType()));

	testSink.consumeDataStream(dataStreamMock);

	final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
		Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
		new ElasticsearchUpsertSinkFunction(
			indexGenerator,
			DOC_TYPE,
			KEY_DELIMITER,
			KEY_NULL_LITERAL,
			JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
			XContentType.JSON,
			Elasticsearch7UpsertTableSink.UPDATE_REQUEST_FACTORY,
			new int[0]));
	expectedBuilder.setFailureHandler(new DummyFailureHandler());
	expectedBuilder.setBulkFlushBackoff(true);
	expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
	expectedBuilder.setBulkFlushBackoffDelay(123);
	expectedBuilder.setBulkFlushBackoffRetries(3);
	expectedBuilder.setBulkFlushInterval(100);
	expectedBuilder.setBulkFlushMaxActions(1000);
	expectedBuilder.setBulkFlushMaxSizeMb(1);
	expectedBuilder.setRestClientFactory(new Elasticsearch7UpsertTableSink.DefaultRestClientFactory("/myapp"));
	assertEquals(expectedBuilder, testSink.builder);
}
 
示例15
@Test
public void testBuilder() {
	final TableSchema schema = createTestSchema();
	final IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator(INDEX, schema);

	final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	final DataStreamMock dataStreamMock = new DataStreamMock(
			new StreamExecutionEnvironmentMock(),
			Types.TUPLE(Types.BOOLEAN, schema.toRowType()));

	testSink.consumeDataStream(dataStreamMock);

	final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
		Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
		new ElasticsearchUpsertSinkFunction(
			indexGenerator,
			DOC_TYPE,
			KEY_DELIMITER,
			KEY_NULL_LITERAL,
			JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
			XContentType.JSON,
			Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
			new int[0]));
	expectedBuilder.setFailureHandler(new DummyFailureHandler());
	expectedBuilder.setBulkFlushBackoff(true);
	expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
	expectedBuilder.setBulkFlushBackoffDelay(123);
	expectedBuilder.setBulkFlushBackoffRetries(3);
	expectedBuilder.setBulkFlushInterval(100);
	expectedBuilder.setBulkFlushMaxActions(1000);
	expectedBuilder.setBulkFlushMaxSizeMb(1);
	expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));
	assertEquals(expectedBuilder, testSink.builder);
}