Java源码示例:org.apache.flink.formats.json.JsonRowSerializationSchema
示例1
@Override
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
Elasticsearch6TableSink tableSink = new Elasticsearch6TableSink(this.elasticsearch6Properties);
if (this.elasticsearch6Properties.getTableSchema() == null) {
tableSink.schema = new TableSchema(fieldNames, fieldTypes);
}
RowTypeInfo rowTypeInfo = new RowTypeInfo(tableSink.schema.getFieldTypes(), tableSink.schema.getFieldNames());
tableSink.serializationSchema = new JsonRowSerializationSchema(rowTypeInfo);
return tableSink;
}
示例2
public Elasticsearch5TableFunction(String index,
Integer fieldIndex,
String type,
JsonRowSerializationSchema jsonRowSchema) {
if (type == null) {
this.type = "*";
} else {
this.type = type;
}
this.index = index;
this.schema = jsonRowSchema;
this.fieldIndex = fieldIndex;
}
示例3
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
Elasticsearch5TableSink copy = new Elasticsearch5TableSink(this.elasticsearch5Properties);
copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
copy.jsonRowSchema = new JsonRowSerializationSchema(rowSchema);
return copy;
}
示例4
private <T> T newJson(TypeInformation typeInformation, Boolean isDeserialization) {
if(isDeserialization){
return (T)new JsonRowDeserializationSchema(typeInformation);
}else{
return (T)new JsonRowSerializationSchema(typeInformation);
}
}
示例5
@Override
public <T> T transform(TableSchema param) throws Exception {
TableSchema tableSchema = createTableSchema();
if (tableSchema == null) {
tableSchema = param;
}
if (tableSchema == null) {
throw new IllegalArgumentException("TableSchema must be not null");
}
TypeInformation[] fieldTypes = new TypeInformation[tableSchema.getFieldCount()];
for (int i = 0; i < tableSchema.getFieldCount(); i++) {
if (FlinkTypeFactory.isTimeIndicatorType(tableSchema.getFieldTypes()[i])) {
fieldTypes[i] = Types.SQL_TIMESTAMP();
}else{
fieldTypes[i] = tableSchema.getFieldTypes()[i];
}
}
TypeInformation typeInformation = new RowTypeInfo(fieldTypes, tableSchema.getFieldNames());
SerializationSchema<Row> rowSerializationSchema = createSerializationSchema(typeInformation);
return (T) newTableSink(
new TableSchema(tableSchema.getFieldNames(), fieldTypes),
this.topic,
PropertiesUtil.fromYamlMap(this.getProperties()),
Optional.empty(),
rowSerializationSchema == null ? new JsonRowSerializationSchema(typeInformation) : rowSerializationSchema
);
}
示例6
@Test
public void testTableSourceUsingDescriptor() throws Exception {
StreamExecutionEnvironment execEnvWrite = StreamExecutionEnvironment.getExecutionEnvironment();
execEnvWrite.setParallelism(1);
Stream stream = Stream.of(SETUP_UTILS.getScope(), "testJsonTableSource1");
SETUP_UTILS.createTestStream(stream.getStreamName(), 1);
// read data from the stream using Table reader
TableSchema tableSchema = TableSchema.builder()
.field("user", DataTypes.STRING())
.field("uri", DataTypes.STRING())
.field("accessTime", DataTypes.TIMESTAMP(3).bridgedTo(Timestamp.class))
.build();
TypeInformation<Row> typeInfo = (RowTypeInfo) TypeConversions.fromDataTypeToLegacyInfo(tableSchema.toRowDataType());
PravegaConfig pravegaConfig = SETUP_UTILS.getPravegaConfig();
// Write some data to the stream
DataStreamSource<Row> dataStream = execEnvWrite
.addSource(new TableEventSource(EVENT_COUNT_PER_SOURCE));
FlinkPravegaWriter<Row> pravegaSink = FlinkPravegaWriter.<Row>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withSerializationSchema(new JsonRowSerializationSchema.Builder(typeInfo).build())
.withEventRouter((Row event) -> "fixedkey")
.build();
dataStream.addSink(pravegaSink);
Assert.assertNotNull(execEnvWrite.getExecutionPlan());
execEnvWrite.execute("PopulateRowData");
testTableSourceStreamingDescriptor(stream, pravegaConfig);
testTableSourceBatchDescriptor(stream, pravegaConfig);
}
示例7
@Test
public void testTableSink() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions(),
IndexGeneratorFactory.createIndexGenerator(INDEX, schema));
// construct table sink using descriptors and table sink factory
final Map<String, String> elasticSearchProperties = createElasticSearchProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, elasticSearchProperties)
.createStreamTableSink(elasticSearchProperties);
assertEquals(expectedSink, actualSink);
}
示例8
@Test
public void testTableSinkWithLegacyProperties() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions(),
IndexGeneratorFactory.createIndexGenerator(INDEX, schema));
// construct table sink using descriptors and table sink factory
final Map<String, String> elasticSearchProperties = createElasticSearchProperties();
final Map<String, String> legacyPropertiesMap = new HashMap<>();
legacyPropertiesMap.putAll(elasticSearchProperties);
// use legacy properties
legacyPropertiesMap.remove("connector.hosts");
legacyPropertiesMap.put("connector.hosts.0.hostname", "host1");
legacyPropertiesMap.put("connector.hosts.0.port", "1234");
legacyPropertiesMap.put("connector.hosts.0.protocol", "https");
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
.createStreamTableSink(legacyPropertiesMap);
assertEquals(expectedSink, actualSink);
}
示例9
@Test
public void testBuilder() {
final TableSchema schema = createTestSchema();
final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
final DataStreamMock dataStreamMock = new DataStreamMock(
new StreamExecutionEnvironmentMock(),
Types.TUPLE(Types.BOOLEAN, schema.toRowType()));
testSink.emitDataStream(dataStreamMock);
final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
new ElasticsearchUpsertSinkFunction(
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
new int[0]));
expectedBuilder.setFailureHandler(new DummyFailureHandler());
expectedBuilder.setBulkFlushBackoff(true);
expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
expectedBuilder.setBulkFlushBackoffDelay(123);
expectedBuilder.setBulkFlushBackoffRetries(3);
expectedBuilder.setBulkFlushInterval(100);
expectedBuilder.setBulkFlushMaxActions(1000);
expectedBuilder.setBulkFlushMaxSizeMb(1);
expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));
assertEquals(expectedBuilder, testSink.builder);
}
示例10
@Test
public void testTableSink() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
// construct table sink using descriptors and table sink factory
final TestTableDescriptor testDesc = new TestTableDescriptor(
new Elasticsearch()
.version(getElasticsearchVersion())
.host(HOSTNAME, PORT, SCHEMA)
.index(INDEX)
.documentType(DOC_TYPE)
.keyDelimiter(KEY_DELIMITER)
.keyNullLiteral(KEY_NULL_LITERAL)
.bulkFlushBackoffExponential()
.bulkFlushBackoffDelay(123L)
.bulkFlushBackoffMaxRetries(3)
.bulkFlushInterval(100L)
.bulkFlushMaxActions(1000)
.bulkFlushMaxSize("1 MB")
.failureHandlerCustom(DummyFailureHandler.class)
.connectionMaxRetryTimeout(100)
.connectionPathPrefix("/myapp"))
.withFormat(
new Json()
.deriveSchema())
.withSchema(
new Schema()
.field(FIELD_KEY, Types.LONG())
.field(FIELD_FRUIT_NAME, Types.STRING())
.field(FIELD_COUNT, Types.DECIMAL())
.field(FIELD_TS, Types.SQL_TIMESTAMP()))
.inUpsertMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertEquals(expectedSink, actualSink);
}
示例11
@Test
public void testBuilder() {
final TableSchema schema = createTestSchema();
final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
final DataStreamMock dataStreamMock = new DataStreamMock(
new StreamExecutionEnvironmentMock(),
Types.TUPLE(Types.BOOLEAN, schema.toRowType()));
testSink.emitDataStream(dataStreamMock);
final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
new ElasticsearchUpsertSinkFunction(
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
new int[0]));
expectedBuilder.setFailureHandler(new DummyFailureHandler());
expectedBuilder.setBulkFlushBackoff(true);
expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
expectedBuilder.setBulkFlushBackoffDelay(123);
expectedBuilder.setBulkFlushBackoffRetries(3);
expectedBuilder.setBulkFlushInterval(100);
expectedBuilder.setBulkFlushMaxActions(1000);
expectedBuilder.setBulkFlushMaxSizeMb(1);
expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));
assertEquals(expectedBuilder, testSink.builder);
}
示例12
@Test
public void testTableSink() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
// construct table sink using descriptors and table sink factory
final TestTableDescriptor testDesc = new TestTableDescriptor(
new Elasticsearch()
.version(getElasticsearchVersion())
.host(HOSTNAME, PORT, SCHEMA)
.index(INDEX)
.documentType(DOC_TYPE)
.keyDelimiter(KEY_DELIMITER)
.keyNullLiteral(KEY_NULL_LITERAL)
.bulkFlushBackoffExponential()
.bulkFlushBackoffDelay(123L)
.bulkFlushBackoffMaxRetries(3)
.bulkFlushInterval(100L)
.bulkFlushMaxActions(1000)
.bulkFlushMaxSize("1 MB")
.failureHandlerCustom(DummyFailureHandler.class)
.connectionMaxRetryTimeout(100)
.connectionPathPrefix("/myapp"))
.withFormat(
new Json()
.deriveSchema())
.withSchema(
new Schema()
.field(FIELD_KEY, Types.LONG())
.field(FIELD_FRUIT_NAME, Types.STRING())
.field(FIELD_COUNT, Types.DECIMAL())
.field(FIELD_TS, Types.SQL_TIMESTAMP()))
.inUpsertMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertEquals(expectedSink, actualSink);
}
示例13
@Override
protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
return new JsonRowSerializationSchema(rowSchema);
}
示例14
@Test
public void testBuilder() {
final TableSchema schema = createTestSchema();
final IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator(INDEX, schema);
final TestElasticsearch7UpsertTableSink testSink = new TestElasticsearch7UpsertTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
final DataStreamMock dataStreamMock = new DataStreamMock(
new StreamExecutionEnvironmentMock(),
Types.TUPLE(Types.BOOLEAN, schema.toRowType()));
testSink.consumeDataStream(dataStreamMock);
final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
new ElasticsearchUpsertSinkFunction(
indexGenerator,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
Elasticsearch7UpsertTableSink.UPDATE_REQUEST_FACTORY,
new int[0]));
expectedBuilder.setFailureHandler(new DummyFailureHandler());
expectedBuilder.setBulkFlushBackoff(true);
expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
expectedBuilder.setBulkFlushBackoffDelay(123);
expectedBuilder.setBulkFlushBackoffRetries(3);
expectedBuilder.setBulkFlushInterval(100);
expectedBuilder.setBulkFlushMaxActions(1000);
expectedBuilder.setBulkFlushMaxSizeMb(1);
expectedBuilder.setRestClientFactory(new Elasticsearch7UpsertTableSink.DefaultRestClientFactory("/myapp"));
assertEquals(expectedBuilder, testSink.builder);
}
示例15
@Test
public void testBuilder() {
final TableSchema schema = createTestSchema();
final IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator(INDEX, schema);
final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
final DataStreamMock dataStreamMock = new DataStreamMock(
new StreamExecutionEnvironmentMock(),
Types.TUPLE(Types.BOOLEAN, schema.toRowType()));
testSink.consumeDataStream(dataStreamMock);
final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
new ElasticsearchUpsertSinkFunction(
indexGenerator,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
new int[0]));
expectedBuilder.setFailureHandler(new DummyFailureHandler());
expectedBuilder.setBulkFlushBackoff(true);
expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
expectedBuilder.setBulkFlushBackoffDelay(123);
expectedBuilder.setBulkFlushBackoffRetries(3);
expectedBuilder.setBulkFlushInterval(100);
expectedBuilder.setBulkFlushMaxActions(1000);
expectedBuilder.setBulkFlushMaxSizeMb(1);
expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));
assertEquals(expectedBuilder, testSink.builder);
}