Java源码示例:org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host

示例1
@Override
protected ElasticsearchUpsertTableSinkBase getExpectedTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions) {
	return new Elasticsearch6UpsertTableSink(
		isAppendOnly,
		schema,
		hosts,
		index,
		docType,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例2
public TestElasticsearch6UpsertTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions) {

	super(
		isAppendOnly,
		schema,
		hosts,
		index,
		docType,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例3
@Override
protected ElasticsearchUpsertTableSinkBase getExpectedTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions) {
	return new Elasticsearch6UpsertTableSink(
		isAppendOnly,
		schema,
		hosts,
		index,
		docType,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例4
public TestElasticsearch6UpsertTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions) {

	super(
		isAppendOnly,
		schema,
		hosts,
		index,
		docType,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例5
@Override
protected ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions) {

	return new Elasticsearch7UpsertTableSink(
		isAppendOnly,
		schema,
		hosts,
		index,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例6
@Override
protected ElasticsearchUpsertTableSinkBase getExpectedTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions,
		IndexGenerator indexGenerator) {
	return new Elasticsearch7UpsertTableSink(
		isAppendOnly,
		schema,
		hosts,
		index,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例7
public TestElasticsearch7UpsertTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions) {

	super(
		isAppendOnly,
		schema,
		hosts,
		index,
		docType,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例8
public TestElasticsearch6UpsertTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions) {

	super(
		isAppendOnly,
		schema,
		hosts,
		index,
		docType,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例9
@Override
protected ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions) {

	return new Elasticsearch6UpsertTableSink(
		isAppendOnly,
		schema,
		hosts,
		index,
		docType,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例10
protected abstract ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
 
示例11
private List<Host> getHosts(DescriptorProperties descriptorProperties) {
	final List<Map<String, String>> hosts = descriptorProperties.getFixedIndexedProperties(
		CONNECTOR_HOSTS,
		Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
	return hosts.stream()
		.map(host -> new Host(
			descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
			descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
			descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL))))
		.collect(Collectors.toList());
}
 
示例12
protected abstract ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
 
示例13
@Override
protected ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions) {

	return new Elasticsearch6UpsertTableSink(
		isAppendOnly,
		schema,
		hosts,
		index,
		docType,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例14
protected abstract ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
 
示例15
private List<Host> getHosts(DescriptorProperties descriptorProperties) {
	final List<Map<String, String>> hosts = descriptorProperties.getFixedIndexedProperties(
		CONNECTOR_HOSTS,
		Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
	return hosts.stream()
		.map(host -> new Host(
			descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
			descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
			descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL))))
		.collect(Collectors.toList());
}
 
示例16
protected abstract ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
 
示例17
@Override
protected ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions) {

	return new Elasticsearch6UpsertTableSink(
		isAppendOnly,
		schema,
		hosts,
		index,
		docType,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例18
@Override
protected ElasticsearchUpsertTableSinkBase getExpectedTableSink(
		boolean isAppendOnly,
		TableSchema schema,
		List<Host> hosts,
		String index,
		String docType,
		String keyDelimiter,
		String keyNullLiteral,
		SerializationSchema<Row> serializationSchema,
		XContentType contentType,
		ActionRequestFailureHandler failureHandler,
		Map<SinkOption, String> sinkOptions,
		IndexGenerator indexGenerator) {
	return new Elasticsearch6UpsertTableSink(
		isAppendOnly,
		schema,
		hosts,
		index,
		docType,
		keyDelimiter,
		keyNullLiteral,
		serializationSchema,
		contentType,
		failureHandler,
		sinkOptions);
}
 
示例19
@Override
protected Map<String, String> toConnectorProperties() {
	final DescriptorProperties properties = new DescriptorProperties();
	properties.putProperties(internalProperties);

	if (hosts.size() > 0) {
		properties.putString(
			CONNECTOR_HOSTS,
			hosts.stream()
				.map(Host::toString)
				.collect(Collectors.joining(";")));
	}
	return properties.asMap();
}
 
示例20
/**
 * Parse Hosts String to list.
 *
 * <p>Hosts String format was given as following:
 *
 * <pre>
 *     connector.hosts = http://host_name:9092;http://host_name:9093
 * </pre>
 */
public static List<Host> validateAndParseHostsString(DescriptorProperties descriptorProperties) {
	final List<Host> hostList = new ArrayList<>();

	descriptorProperties.validateString(CONNECTOR_HOSTS, false, 1);
	final String hostsStr = descriptorProperties.getString(CONNECTOR_HOSTS);

	final String[] hosts = hostsStr.split(";");
	final String validationExceptionMessage = "Properties '" + CONNECTOR_HOSTS + "' format should " +
		"follow the format 'http://host_name:port', but is '" + hostsStr + "'.";

	if (hosts.length == 0) {
		throw new ValidationException(validationExceptionMessage);
	}
	for (String host : hosts) {
		try {
			final URL url = new URL(host);
			final String protocol = url.getProtocol();
			final String hostName = url.getHost();
			final int hostPort = url.getPort();

			if (StringUtils.isNullOrWhitespaceOnly(protocol) ||
				StringUtils.isNullOrWhitespaceOnly(hostName) ||
				-1 == hostPort) {
				throw new ValidationException(validationExceptionMessage);
			}

			hostList.add(new Host(hostName, hostPort, protocol));
		} catch (MalformedURLException e) {
			throw new ValidationException(validationExceptionMessage, e);
		}
	}
	return hostList;
}
 
示例21
protected abstract ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
 
示例22
private List<Host> getHosts(DescriptorProperties descriptorProperties) {
	if (descriptorProperties.containsKey(CONNECTOR_HOSTS)) {
		return validateAndParseHostsString(descriptorProperties);
	} else {
		final List<Map<String, String>> hosts = descriptorProperties.getFixedIndexedProperties(
			CONNECTOR_HOSTS,
			Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL));
		return hosts.stream()
			.map(host -> new Host(
				descriptorProperties.getString(host.get(CONNECTOR_HOSTS_HOSTNAME)),
				descriptorProperties.getInt(host.get(CONNECTOR_HOSTS_PORT)),
				descriptorProperties.getString(host.get(CONNECTOR_HOSTS_PROTOCOL))))
			.collect(Collectors.toList());
	}
}
 
示例23
@Test
public void testTableSink() {
	// prepare parameters for Elasticsearch table sink

	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions(),
		IndexGeneratorFactory.createIndexGenerator(INDEX, schema));

	// construct table sink using descriptors and table sink factory
	final Map<String, String> elasticSearchProperties = createElasticSearchProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, elasticSearchProperties)
		.createStreamTableSink(elasticSearchProperties);

	assertEquals(expectedSink, actualSink);
}
 
示例24
@Test
public void testTableSinkWithLegacyProperties() {
	// prepare parameters for Elasticsearch table sink
	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions(),
		IndexGeneratorFactory.createIndexGenerator(INDEX, schema));

	// construct table sink using descriptors and table sink factory
	final Map<String, String> elasticSearchProperties = createElasticSearchProperties();

	final Map<String, String> legacyPropertiesMap = new HashMap<>();
	legacyPropertiesMap.putAll(elasticSearchProperties);
	// use legacy properties
	legacyPropertiesMap.remove("connector.hosts");

	legacyPropertiesMap.put("connector.hosts.0.hostname", "host1");
	legacyPropertiesMap.put("connector.hosts.0.port", "1234");
	legacyPropertiesMap.put("connector.hosts.0.protocol", "https");

	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)
		.createStreamTableSink(legacyPropertiesMap);

	assertEquals(expectedSink, actualSink);
}
 
示例25
protected abstract ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions,
IndexGenerator indexGenerator);
 
示例26
@Test
public void testBuilder() {
	final TableSchema schema = createTestSchema();

	final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		new JsonRowSerializationSchema(schema.toRowType()),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	final DataStreamMock dataStreamMock = new DataStreamMock(
			new StreamExecutionEnvironmentMock(),
			Types.TUPLE(Types.BOOLEAN, schema.toRowType()));

	testSink.emitDataStream(dataStreamMock);

	final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
		Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
		new ElasticsearchUpsertSinkFunction(
			INDEX,
			DOC_TYPE,
			KEY_DELIMITER,
			KEY_NULL_LITERAL,
			new JsonRowSerializationSchema(schema.toRowType()),
			XContentType.JSON,
			Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
			new int[0]));
	expectedBuilder.setFailureHandler(new DummyFailureHandler());
	expectedBuilder.setBulkFlushBackoff(true);
	expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
	expectedBuilder.setBulkFlushBackoffDelay(123);
	expectedBuilder.setBulkFlushBackoffRetries(3);
	expectedBuilder.setBulkFlushInterval(100);
	expectedBuilder.setBulkFlushMaxActions(1000);
	expectedBuilder.setBulkFlushMaxSizeMb(1);
	expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));

	assertEquals(expectedBuilder, testSink.builder);
}
 
示例27
@Test
public void testTableSink() {
	// prepare parameters for Elasticsearch table sink

	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		new JsonRowSerializationSchema(schema.toRowType()),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	// construct table sink using descriptors and table sink factory

	final TestTableDescriptor testDesc = new TestTableDescriptor(
			new Elasticsearch()
				.version(getElasticsearchVersion())
				.host(HOSTNAME, PORT, SCHEMA)
				.index(INDEX)
				.documentType(DOC_TYPE)
				.keyDelimiter(KEY_DELIMITER)
				.keyNullLiteral(KEY_NULL_LITERAL)
				.bulkFlushBackoffExponential()
				.bulkFlushBackoffDelay(123L)
				.bulkFlushBackoffMaxRetries(3)
				.bulkFlushInterval(100L)
				.bulkFlushMaxActions(1000)
				.bulkFlushMaxSize("1 MB")
				.failureHandlerCustom(DummyFailureHandler.class)
				.connectionMaxRetryTimeout(100)
				.connectionPathPrefix("/myapp"))
		.withFormat(
			new Json()
				.deriveSchema())
		.withSchema(
			new Schema()
				.field(FIELD_KEY, Types.LONG())
				.field(FIELD_FRUIT_NAME, Types.STRING())
				.field(FIELD_COUNT, Types.DECIMAL())
				.field(FIELD_TS, Types.SQL_TIMESTAMP()))
		.inUpsertMode();

	final Map<String, String> propertiesMap = testDesc.toProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
		.createStreamTableSink(propertiesMap);

	assertEquals(expectedSink, actualSink);
}
 
示例28
@Test
public void testBuilder() {
	final TableSchema schema = createTestSchema();

	final TestElasticsearch6UpsertTableSink testSink = new TestElasticsearch6UpsertTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		new JsonRowSerializationSchema(schema.toRowType()),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	final DataStreamMock dataStreamMock = new DataStreamMock(
			new StreamExecutionEnvironmentMock(),
			Types.TUPLE(Types.BOOLEAN, schema.toRowType()));

	testSink.emitDataStream(dataStreamMock);

	final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
		Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
		new ElasticsearchUpsertSinkFunction(
			INDEX,
			DOC_TYPE,
			KEY_DELIMITER,
			KEY_NULL_LITERAL,
			new JsonRowSerializationSchema(schema.toRowType()),
			XContentType.JSON,
			Elasticsearch6UpsertTableSink.UPDATE_REQUEST_FACTORY,
			new int[0]));
	expectedBuilder.setFailureHandler(new DummyFailureHandler());
	expectedBuilder.setBulkFlushBackoff(true);
	expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
	expectedBuilder.setBulkFlushBackoffDelay(123);
	expectedBuilder.setBulkFlushBackoffRetries(3);
	expectedBuilder.setBulkFlushInterval(100);
	expectedBuilder.setBulkFlushMaxActions(1000);
	expectedBuilder.setBulkFlushMaxSizeMb(1);
	expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));

	assertEquals(expectedBuilder, testSink.builder);
}
 
示例29
@Test
public void testTableSink() {
	// prepare parameters for Elasticsearch table sink

	final TableSchema schema = createTestSchema();

	final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		new JsonRowSerializationSchema(schema.toRowType()),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	// construct table sink using descriptors and table sink factory

	final TestTableDescriptor testDesc = new TestTableDescriptor(
			new Elasticsearch()
				.version(getElasticsearchVersion())
				.host(HOSTNAME, PORT, SCHEMA)
				.index(INDEX)
				.documentType(DOC_TYPE)
				.keyDelimiter(KEY_DELIMITER)
				.keyNullLiteral(KEY_NULL_LITERAL)
				.bulkFlushBackoffExponential()
				.bulkFlushBackoffDelay(123L)
				.bulkFlushBackoffMaxRetries(3)
				.bulkFlushInterval(100L)
				.bulkFlushMaxActions(1000)
				.bulkFlushMaxSize("1 MB")
				.failureHandlerCustom(DummyFailureHandler.class)
				.connectionMaxRetryTimeout(100)
				.connectionPathPrefix("/myapp"))
		.withFormat(
			new Json()
				.deriveSchema())
		.withSchema(
			new Schema()
				.field(FIELD_KEY, Types.LONG())
				.field(FIELD_FRUIT_NAME, Types.STRING())
				.field(FIELD_COUNT, Types.DECIMAL())
				.field(FIELD_TS, Types.SQL_TIMESTAMP()))
		.inUpsertMode();

	final Map<String, String> propertiesMap = testDesc.toProperties();
	final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
		.createStreamTableSink(propertiesMap);

	assertEquals(expectedSink, actualSink);
}
 
示例30
@Test
public void testBuilder() {
	final TableSchema schema = createTestSchema();
	final IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator(INDEX, schema);

	final TestElasticsearch7UpsertTableSink testSink = new TestElasticsearch7UpsertTableSink(
		false,
		schema,
		Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
		INDEX,
		DOC_TYPE,
		KEY_DELIMITER,
		KEY_NULL_LITERAL,
		JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
		XContentType.JSON,
		new DummyFailureHandler(),
		createTestSinkOptions());

	final DataStreamMock dataStreamMock = new DataStreamMock(
			new StreamExecutionEnvironmentMock(),
			Types.TUPLE(Types.BOOLEAN, schema.toRowType()));

	testSink.consumeDataStream(dataStreamMock);

	final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
		Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
		new ElasticsearchUpsertSinkFunction(
			indexGenerator,
			DOC_TYPE,
			KEY_DELIMITER,
			KEY_NULL_LITERAL,
			JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(),
			XContentType.JSON,
			Elasticsearch7UpsertTableSink.UPDATE_REQUEST_FACTORY,
			new int[0]));
	expectedBuilder.setFailureHandler(new DummyFailureHandler());
	expectedBuilder.setBulkFlushBackoff(true);
	expectedBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
	expectedBuilder.setBulkFlushBackoffDelay(123);
	expectedBuilder.setBulkFlushBackoffRetries(3);
	expectedBuilder.setBulkFlushInterval(100);
	expectedBuilder.setBulkFlushMaxActions(1000);
	expectedBuilder.setBulkFlushMaxSizeMb(1);
	expectedBuilder.setRestClientFactory(new Elasticsearch7UpsertTableSink.DefaultRestClientFactory("/myapp"));
	assertEquals(expectedBuilder, testSink.builder);
}