Java源码示例:org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host
示例1
@Override
protected ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
return new Elasticsearch6UpsertTableSink(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例2
public TestElasticsearch6UpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
super(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例3
@Override
protected ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
return new Elasticsearch6UpsertTableSink(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例4
public TestElasticsearch6UpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
super(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例5
@Override
protected ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
return new Elasticsearch7UpsertTableSink(
isAppendOnly,
schema,
hosts,
index,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例6
@Override
protected ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions,
IndexGenerator indexGenerator) {
return new Elasticsearch7UpsertTableSink(
isAppendOnly,
schema,
hosts,
index,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例7
public TestElasticsearch7UpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
super(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例8
public TestElasticsearch6UpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
super(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例9
@Override
protected ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
return new Elasticsearch6UpsertTableSink(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例10
protected abstract ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
示例11
private List<Host> getHosts(DescriptorProperties descriptorProperties) {
final List<Map<String, String>> hosts = descriptorProperties.getFixedIndexedProperties(
CONNECTOR_HOSTS,
Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
return hosts.stream()
.map(host -> new Host(
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL))))
.collect(Collectors.toList());
}
示例12
protected abstract ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
示例13
@Override
protected ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
return new Elasticsearch6UpsertTableSink(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例14
protected abstract ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
示例15
private List<Host> getHosts(DescriptorProperties descriptorProperties) {
final List<Map<String, String>> hosts = descriptorProperties.getFixedIndexedProperties(
CONNECTOR_HOSTS,
Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
return hosts.stream()
.map(host -> new Host(
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL))))
.collect(Collectors.toList());
}
示例16
protected abstract ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
示例17
@Override
protected ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions) {
return new Elasticsearch6UpsertTableSink(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例18
@Override
protected ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions,
IndexGenerator indexGenerator) {
return new Elasticsearch6UpsertTableSink(
isAppendOnly,
schema,
hosts,
index,
docType,
keyDelimiter,
keyNullLiteral,
serializationSchema,
contentType,
failureHandler,
sinkOptions);
}
示例19
@Override
protected Map<String, String> toConnectorProperties() {
final DescriptorProperties properties = new DescriptorProperties();
properties.putProperties(internalProperties);
if (hosts.size() > 0) {
properties.putString(
CONNECTOR_HOSTS,
hosts.stream()
.map(Host::toString)
.collect(Collectors.joining(";")));
}
return properties.asMap();
}
示例20
/**
* Parse Hosts String to list.
*
* <p>Hosts String format was given as following:
*
* <pre>
* connector.hosts = http://host_name:9092;http://host_name:9093
* </pre>
*/
public static List<Host> validateAndParseHostsString(DescriptorProperties descriptorProperties) {
final List<Host> hostList = new ArrayList<>();
descriptorProperties.validateString(CONNECTOR_HOSTS, false, 1);
final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS);
final String[] hosts = hostsStr.split(";");
final String validationExceptionMessage = "Properties '" + CONNECTOR_HOSTS + "' format should " +
"follow the format 'http://host_name:port', but is '" + hostsStr + "'.";
if (hosts.length == 0) {
throw new ValidationException(validationExceptionMessage);
}
for (String host : hosts) {
try {
final URL url = new URL(host);
final String protocol = url.getProtocol();
final String hostName = url.getHost();
final int hostPort = url.getPort();
if (StringUtils.isNullOrWhitespaceOnly(protocol) ||
StringUtils.isNullOrWhitespaceOnly(hostName) ||
-1 == hostPort) {
throw new ValidationException(validationExceptionMessage);
}
hostList.add(new Host(hostName, hostPort, protocol));
} catch (MalformedURLException e) {
throw new ValidationException(validationExceptionMessage, e);
}
}
return hostList;
}
示例21
protected abstract ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
示例22
private List<Host> getHosts(DescriptorProperties descriptorProperties) {
if (descriptorProperties.containsKey(CONNECTOR_HOSTS)) {
return validateAndParseHostsString(descriptorProperties);
} else {
final List<Map<String, String>> hosts = descriptorProperties.getFixedIndexedProperties(
CONNECTOR_HOSTS,
Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
return hosts.stream()
.map(host -> new Host(
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL))))
.collect(Collectors.toList());
}
}
示例23
@Test
public void testTableSink() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions(),
IndexGeneratorFactory.createIndexGenerator(INDEX, schema));
// construct table sink using descriptors and table sink factory
final Map<String, String> elasticSearchProperties = createElasticSearchProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, elasticSearchProperties)
.createStreamTableSink(elasticSearchProperties);
assertEquals(expectedSink, actualSink);
}
示例24
@Test
public void testTableSinkWithLegacyProperties() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions(),
IndexGeneratorFactory.createIndexGenerator(INDEX, schema));
// construct table sink using descriptors and table sink factory
final Map<String, String> elasticSearchProperties = createElasticSearchProperties();
final Map<String, String> legacyPropertiesMap = new HashMap<>();
legacyPropertiesMap.putAll(elasticSearchProperties);
// use legacy properties
legacyPropertiesMap.remove("connector.hosts");
legacyPropertiesMap.put("connector.hosts.0.hostname", "host1");
legacyPropertiesMap.put("connector.hosts.0.port", "1234");
legacyPropertiesMap.put("connector.hosts.0.protocol", "https");
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
.createStreamTableSink(legacyPropertiesMap);
assertEquals(expectedSink, actualSink);
}
示例25
protected abstract ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions,
IndexGenerator indexGenerator);
示例26
@Test
public void testBuilder() {
final TableSchema schema = createTestSchema();
final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
final DataStreamMock dataStreamMock = new DataStreamMock(
new StreamExecutionEnvironmentMock(),
Types.TUPLE(Types.BOOLEAN, schema.toRowType()));
testSink.emitDataStream(dataStreamMock);
final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
new ElasticsearchUpsertSinkFunction(
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
new int[0]));
expectedBuilder.setFailureHandler(new DummyFailureHandler());
expectedBuilder.setBulkFlushBackoff(true);
expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
expectedBuilder.setBulkFlushBackoffDelay(123);
expectedBuilder.setBulkFlushBackoffRetries(3);
expectedBuilder.setBulkFlushInterval(100);
expectedBuilder.setBulkFlushMaxActions(1000);
expectedBuilder.setBulkFlushMaxSizeMb(1);
expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));
assertEquals(expectedBuilder, testSink.builder);
}
示例27
@Test
public void testTableSink() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
// construct table sink using descriptors and table sink factory
final TestTableDescriptor testDesc = new TestTableDescriptor(
new Elasticsearch()
.version(getElasticsearchVersion())
.host(HOSTNAME, PORT, SCHEMA)
.index(INDEX)
.documentType(DOC_TYPE)
.keyDelimiter(KEY_DELIMITER)
.keyNullLiteral(KEY_NULL_LITERAL)
.bulkFlushBackoffExponential()
.bulkFlushBackoffDelay(123L)
.bulkFlushBackoffMaxRetries(3)
.bulkFlushInterval(100L)
.bulkFlushMaxActions(1000)
.bulkFlushMaxSize("1 MB")
.failureHandlerCustom(DummyFailureHandler.class)
.connectionMaxRetryTimeout(100)
.connectionPathPrefix("/myapp"))
.withFormat(
new Json()
.deriveSchema())
.withSchema(
new Schema()
.field(FIELD_KEY, Types.LONG())
.field(FIELD_FRUIT_NAME, Types.STRING())
.field(FIELD_COUNT, Types.DECIMAL())
.field(FIELD_TS, Types.SQL_TIMESTAMP()))
.inUpsertMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertEquals(expectedSink, actualSink);
}
示例28
@Test
public void testBuilder() {
final TableSchema schema = createTestSchema();
final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
final DataStreamMock dataStreamMock = new DataStreamMock(
new StreamExecutionEnvironmentMock(),
Types.TUPLE(Types.BOOLEAN, schema.toRowType()));
testSink.emitDataStream(dataStreamMock);
final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
new ElasticsearchUpsertSinkFunction(
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
new int[0]));
expectedBuilder.setFailureHandler(new DummyFailureHandler());
expectedBuilder.setBulkFlushBackoff(true);
expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
expectedBuilder.setBulkFlushBackoffDelay(123);
expectedBuilder.setBulkFlushBackoffRetries(3);
expectedBuilder.setBulkFlushInterval(100);
expectedBuilder.setBulkFlushMaxActions(1000);
expectedBuilder.setBulkFlushMaxSizeMb(1);
expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));
assertEquals(expectedBuilder, testSink.builder);
}
示例29
@Test
public void testTableSink() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema(schema.toRowType()),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
// construct table sink using descriptors and table sink factory
final TestTableDescriptor testDesc = new TestTableDescriptor(
new Elasticsearch()
.version(getElasticsearchVersion())
.host(HOSTNAME, PORT, SCHEMA)
.index(INDEX)
.documentType(DOC_TYPE)
.keyDelimiter(KEY_DELIMITER)
.keyNullLiteral(KEY_NULL_LITERAL)
.bulkFlushBackoffExponential()
.bulkFlushBackoffDelay(123L)
.bulkFlushBackoffMaxRetries(3)
.bulkFlushInterval(100L)
.bulkFlushMaxActions(1000)
.bulkFlushMaxSize("1 MB")
.failureHandlerCustom(DummyFailureHandler.class)
.connectionMaxRetryTimeout(100)
.connectionPathPrefix("/myapp"))
.withFormat(
new Json()
.deriveSchema())
.withSchema(
new Schema()
.field(FIELD_KEY, Types.LONG())
.field(FIELD_FRUIT_NAME, Types.STRING())
.field(FIELD_COUNT, Types.DECIMAL())
.field(FIELD_TS, Types.SQL_TIMESTAMP()))
.inUpsertMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertEquals(expectedSink, actualSink);
}
示例30
@Test
public void testBuilder() {
final TableSchema schema = createTestSchema();
final IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator(INDEX, schema);
final TestElasticsearch7UpsertTableSink testSink = new TestElasticsearch7UpsertTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
final DataStreamMock dataStreamMock = new DataStreamMock(
new StreamExecutionEnvironmentMock(),
Types.TUPLE(Types.BOOLEAN, schema.toRowType()));
testSink.consumeDataStream(dataStreamMock);
final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
new ElasticsearchUpsertSinkFunction(
indexGenerator,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
XContentType.JSON,
Elasticsearch7UpsertTableSink.UPDATE_REQUEST_FACTORY,
new int[0]));
expectedBuilder.setFailureHandler(new DummyFailureHandler());
expectedBuilder.setBulkFlushBackoff(true);
expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
expectedBuilder.setBulkFlushBackoffDelay(123);
expectedBuilder.setBulkFlushBackoffRetries(3);
expectedBuilder.setBulkFlushInterval(100);
expectedBuilder.setBulkFlushMaxActions(1000);
expectedBuilder.setBulkFlushMaxSizeMb(1);
expectedBuilder.setRestClientFactory(new Elasticsearch7UpsertTableSink.DefaultRestClientFactory("/myapp"));
assertEquals(expectedBuilder, testSink.builder);
}