Java源码示例:org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants

示例1
/**
 * Creates an Amazon Kinesis Client.
 * @param configProps configuration properties containing the access key, secret key, and region
 * @param awsClientConfig preconfigured AWS SDK client configuration
 * @return a new Amazon Kinesis Client
 */
public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
	// set a Flink-specific user agent
	awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
			EnvironmentInformation.getVersion(),
			EnvironmentInformation.getRevisionInformation().commitId));

	// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
	AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
			.withCredentials(AWSUtil.getCredentialsProvider(configProps))
			.withClientConfiguration(awsClientConfig);

	if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
		// Set signingRegion as null, to facilitate mocking Kinesis for local tests
		builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
												configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
												null));
	} else {
		builder.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
	}
	return builder.build();
}
 
示例2
public static void main(String[] args) throws Exception {
	ParameterTool pt = ParameterTool.fromArgs(args);

	StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
	see.setParallelism(1);

	DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());

	Properties kinesisProducerConfig = new Properties();
	kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
	kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
	kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));

	FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
			new SimpleStringSchema(), kinesisProducerConfig);

	kinesis.setFailOnError(true);
	kinesis.setDefaultStream("flink-test");
	kinesis.setDefaultPartition("0");

	simpleStringStream.addSink(kinesis);

	see.execute();
}
 
示例3
/**
 * Creates an Amazon Kinesis Client.
 * @param configProps configuration properties containing the access key, secret key, and region
 * @param awsClientConfig preconfigured AWS SDK client configuration
 * @return a new Amazon Kinesis Client
 */
public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
	// set a Flink-specific user agent
	awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
			EnvironmentInformation.getVersion(),
			EnvironmentInformation.getRevisionInformation().commitId));

	// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
	AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
			.withCredentials(AWSUtil.getCredentialsProvider(configProps))
			.withClientConfiguration(awsClientConfig);

	if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
		// Set signingRegion as null, to facilitate mocking Kinesis for local tests
		builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
												configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
												null));
	} else {
		builder.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
	}
	return builder.build();
}
 
示例4
public static void main(String[] args) throws Exception {
	ParameterTool pt = ParameterTool.fromArgs(args);

	StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
	see.setParallelism(1);

	DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());

	Properties kinesisProducerConfig = new Properties();
	kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
	kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
	kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));

	FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
			new SimpleStringSchema(), kinesisProducerConfig);

	kinesis.setFailOnError(true);
	kinesis.setDefaultStream("flink-test");
	kinesis.setDefaultPartition("0");

	simpleStringStream.addSink(kinesis);

	see.execute();
}
 
示例5
static Properties forAwsRegionConsumerProps(AwsRegion awsRegion) {
  final Properties properties = new Properties();

  if (awsRegion.isDefault()) {
    properties.setProperty(AWSConfigConstants.AWS_REGION, regionFromDefaultProviderChain());
  } else if (awsRegion.isId()) {
    properties.setProperty(AWSConfigConstants.AWS_REGION, awsRegion.asId().id());
  } else if (awsRegion.isCustomEndpoint()) {
    final AwsRegion.CustomEndpointAwsRegion customEndpoint = awsRegion.asCustomEndpoint();
    properties.setProperty(AWSConfigConstants.AWS_ENDPOINT, customEndpoint.serviceEndpoint());
    properties.setProperty(AWSConfigConstants.AWS_REGION, customEndpoint.regionId());
  } else {
    throw new IllegalStateException("Unrecognized AWS region configuration type: " + awsRegion);
  }

  return properties;
}
 
示例6
static Properties forAwsRegionProducerProps(AwsRegion awsRegion) {
  final Properties properties = new Properties();

  if (awsRegion.isDefault()) {
    properties.setProperty(AWSConfigConstants.AWS_REGION, regionFromDefaultProviderChain());
  } else if (awsRegion.isId()) {
    properties.setProperty(AWSConfigConstants.AWS_REGION, awsRegion.asId().id());
  } else if (awsRegion.isCustomEndpoint()) {
    final AwsRegion.CustomEndpointAwsRegion customEndpoint = awsRegion.asCustomEndpoint();

    final URI uri = URI.create(customEndpoint.serviceEndpoint());
    properties.setProperty("KinesisEndpoint", uri.getHost());
    properties.setProperty(AWSConfigConstants.AWS_REGION, customEndpoint.regionId());

    int port = uri.getPort();
    if (port != -1) {
      properties.setProperty("KinesisPort", String.valueOf(port));
    }
  } else {
    throw new IllegalStateException("Unrecognized AWS region configuration type: " + awsRegion);
  }

  return properties;
}
 
示例7
@Test
public void awsBasicCredentialsProperties() {
  final Properties properties =
      AwsAuthConfigProperties.forAwsCredentials(
          AwsCredentials.basic("fake-access-key-id", "fake-secret-access-key"));

  assertThat(properties.entrySet(), hasSize(3));
  assertThat(
      properties,
      hasEntry(
          AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
          AWSConfigConstants.CredentialProvider.BASIC.name()));
  assertThat(
      properties,
      hasEntry(
          AWSConfigConstants.accessKeyId(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
          "fake-access-key-id"));
  assertThat(
      properties,
      hasEntry(
          AWSConfigConstants.secretKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
          "fake-secret-access-key"));
}
 
示例8
@Test
public void awsProfileCredentialsProperties() {
  final Properties properties =
      AwsAuthConfigProperties.forAwsCredentials(
          AwsCredentials.profile("fake-profile", "/fake/profile/path"));

  assertThat(properties.entrySet(), hasSize(3));
  assertThat(
      properties,
      hasEntry(
          AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
          AWSConfigConstants.CredentialProvider.PROFILE.name()));
  assertThat(
      properties,
      hasEntry(
          AWSConfigConstants.profileName(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
          "fake-profile"));
  assertThat(
      properties,
      hasEntry(
          AWSConfigConstants.profilePath(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
          "/fake/profile/path"));
}
 
示例9
/**
 * Creates an Amazon Kinesis Client.
 * @param configProps configuration properties containing the access key, secret key, and region
 * @param awsClientConfig preconfigured AWS SDK client configuration
 * @return a new Amazon Kinesis Client
 */
public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
	// set a Flink-specific user agent
	awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
			EnvironmentInformation.getVersion(),
			EnvironmentInformation.getRevisionInformation().commitId));

	// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
	AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
			.withCredentials(AWSUtil.getCredentialsProvider(configProps))
			.withClientConfiguration(awsClientConfig);

	if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
		// If an endpoint is specified, we give preference to using an endpoint and use the region property to
		// sign the request.
		builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
			configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
			configProps.getProperty(AWSConfigConstants.AWS_REGION)));
	} else {
		builder.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
	}
	return builder.build();
}
 
示例10
public static void main(String[] args) throws Exception {
	ParameterTool pt = ParameterTool.fromArgs(args);

	StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
	see.setParallelism(1);

	DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());

	Properties kinesisProducerConfig = new Properties();
	kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
	kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
	kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));

	FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
			new SimpleStringSchema(), kinesisProducerConfig);

	kinesis.setFailOnError(true);
	kinesis.setDefaultStream("flink-test");
	kinesis.setDefaultPartition("0");

	simpleStringStream.addSink(kinesis);

	see.execute();
}
 
示例11
/**
 * Validate configuration properties for {@link FlinkKinesisProducer},
 * and return a constructed KinesisProducerConfiguration.
 */
public static KinesisProducerConfiguration getValidatedProducerConfiguration(Properties config) {
	checkNotNull(config, "config can not be null");

	validateAwsConfiguration(config);

	if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
		// per requirement in Amazon Kinesis Producer Library
		throw new IllegalArgumentException(String.format("For FlinkKinesisProducer AWS region ('%s') must be set in the config.", AWSConfigConstants.AWS_REGION));
	}

	KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
	kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));

	kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));

	// we explicitly lower the credential refresh delay (default is 5 seconds)
	// to avoid an ignorable interruption warning that occurs when shutting down the
	// KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10.
	kpc.setCredentialsRefreshDelay(100);

	// Override default values if they aren't specified by users
	if (!config.containsKey(RATE_LIMIT)) {
		kpc.setRateLimit(DEFAULT_RATE_LIMIT);
	}
	if (!config.containsKey(THREADING_MODEL)) {
		kpc.setThreadingModel(DEFAULT_THREADING_MODEL);
	}
	if (!config.containsKey(THREAD_POOL_SIZE)) {
		kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
	}

	return kpc;
}
 
示例12
/**
 * Get standard Kinesis-related config properties.
 */
public static Properties getStandardProperties() {
	Properties config = new Properties();
	config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
	config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");

	return config;
}
 
示例13
@Test
public void testClientConfigOverride() {

	Properties configProps = new Properties();
	configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	configProps.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX + "socketTimeout", "9999");

	KinesisProxyInterface proxy = KinesisProxy.create(configProps);

	AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
	ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient,
		"clientConfiguration");
	assertEquals(9999, clientConfiguration.getSocketTimeout());
}
 
示例14
@Test
public void testUnparsableLongForProducerConfiguration() {
	exception.expect(IllegalArgumentException.class);
	exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'");

	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	testConfig.setProperty("RateLimit", "unparsableLong");

	KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
}
 
示例15
@Test
public void testRateLimitInProducerConfiguration() {
	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals(100, kpc.getRateLimit());

	testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
	kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals(150, kpc.getRateLimit());
}
 
示例16
@Test
public void testThreadingModelInProducerConfiguration() {
	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel());

	testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST");
	kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel());
}
 
示例17
@Test
public void testThreadPoolSizeInProducerConfiguration() {
	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals(10, kpc.getThreadPoolSize());

	testConfig.setProperty(KinesisConfigUtil.THREAD_POOL_SIZE, "12");
	kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals(12, kpc.getThreadPoolSize());
}
 
示例18
@Test
public void testReplaceDeprecatedKeys() {
	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	// these deprecated keys should be replaced
	testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1");
	testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2");
	Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig);

	assertEquals("1", replacedConfig.getProperty(KinesisConfigUtil.AGGREGATION_MAX_COUNT));
	assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT));
}
 
示例19
@Test
public void testCorrectlySetRegionInProducerConfiguration() {
	String region = "us-east-1";
	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
	KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals("incorrect region", region, kpc.getRegion());
}
 
示例20
@Test
public void testMissingAwsRegionInProducerConfig() {
	String expectedMessage = String.format("For FlinkKinesisProducer AWS region ('%s') must be set in the config.",
			AWSConfigConstants.AWS_REGION);
	exception.expect(IllegalArgumentException.class);
	exception.expectMessage(expectedMessage);

	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
	testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");

	KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
}
 
示例21
@Test
public void testUnrecognizableAwsRegionInConfig() {
	exception.expect(IllegalArgumentException.class);
	exception.expectMessage("Invalid AWS region");

	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId");
	testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
	testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");

	KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
 
示例22
@Test
public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
	exception.expect(IllegalArgumentException.class);
	exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
			"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");

	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");

	KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
 
示例23
@Test
public void testUnrecognizableCredentialProviderTypeInConfig() {
	exception.expect(IllegalArgumentException.class);
	exception.expectMessage("Invalid AWS Credential Provider Type");

	Properties testConfig = TestUtils.getStandardProperties();
	testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");

	KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
 
示例24
@Test
public void testAwsRegionOrEndpointInConsumerConfig() {
	String expectedMessage = String.format("For FlinkKinesisConsumer either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
			AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT);
	exception.expect(IllegalArgumentException.class);
	exception.expectMessage(expectedMessage);

	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	testConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "fake");
	testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
	testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");

	KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
 
示例25
/**
 * Get standard Kinesis-related config properties.
 */
public static Properties getStandardProperties() {
	Properties config = new Properties();
	config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
	config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");

	return config;
}
 
示例26
/**
 * Validate configuration properties for {@link FlinkKinesisProducer},
 * and return a constructed KinesisProducerConfiguration.
 */
public static KinesisProducerConfiguration getValidatedProducerConfiguration(Properties config) {
	checkNotNull(config, "config can not be null");

	validateAwsConfiguration(config);

	if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
		// per requirement in Amazon Kinesis Producer Library
		throw new IllegalArgumentException(String.format("For FlinkKinesisProducer AWS region ('%s') must be set in the config.", AWSConfigConstants.AWS_REGION));
	}

	KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
	kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));

	kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));

	// we explicitly lower the credential refresh delay (default is 5 seconds)
	// to avoid an ignorable interruption warning that occurs when shutting down the
	// KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10.
	kpc.setCredentialsRefreshDelay(100);

	// Override default values if they aren't specified by users
	if (!config.containsKey(RATE_LIMIT)) {
		kpc.setRateLimit(DEFAULT_RATE_LIMIT);
	}
	if (!config.containsKey(THREADING_MODEL)) {
		kpc.setThreadingModel(DEFAULT_THREADING_MODEL);
	}
	if (!config.containsKey(THREAD_POOL_SIZE)) {
		kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
	}

	return kpc;
}
 
示例27
@Test
public void testClientConfigOverride() {

	Properties configProps = new Properties();
	configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	configProps.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX + "socketTimeout", "9999");

	KinesisProxyInterface proxy = KinesisProxy.create(configProps);

	AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
	ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient,
		"clientConfiguration");
	assertEquals(9999, clientConfiguration.getSocketTimeout());
}
 
示例28
@Test
public void testUnparsableLongForProducerConfiguration() {
	exception.expect(IllegalArgumentException.class);
	exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'");

	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	testConfig.setProperty("RateLimit", "unparsableLong");

	KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
}
 
示例29
@Test
public void testRateLimitInProducerConfiguration() {
	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals(100, kpc.getRateLimit());

	testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
	kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals(150, kpc.getRateLimit());
}
 
示例30
@Test
public void testThreadingModelInProducerConfiguration() {
	Properties testConfig = new Properties();
	testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
	KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel());

	testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST");
	kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

	assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel());
}