Java源码示例:org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants
示例1
/**
* Creates an Amazon Kinesis Client.
* @param configProps configuration properties containing the access key, secret key, and region
* @param awsClientConfig preconfigured AWS SDK client configuration
* @return a new Amazon Kinesis Client
*/
public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
// set a Flink-specific user agent
awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
EnvironmentInformation.getVersion(),
EnvironmentInformation.getRevisionInformation().commitId));
// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
.withClientConfiguration(awsClientConfig);
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
// Set signingRegion as null, to facilitate mocking Kinesis for local tests
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
null));
} else {
builder.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
}
return builder.build();
}
示例2
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(1);
DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
new SimpleStringSchema(), kinesisProducerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("flink-test");
kinesis.setDefaultPartition("0");
simpleStringStream.addSink(kinesis);
see.execute();
}
示例3
/**
* Creates an Amazon Kinesis Client.
* @param configProps configuration properties containing the access key, secret key, and region
* @param awsClientConfig preconfigured AWS SDK client configuration
* @return a new Amazon Kinesis Client
*/
public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
// set a Flink-specific user agent
awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
EnvironmentInformation.getVersion(),
EnvironmentInformation.getRevisionInformation().commitId));
// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
.withClientConfiguration(awsClientConfig);
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
// Set signingRegion as null, to facilitate mocking Kinesis for local tests
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
null));
} else {
builder.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
}
return builder.build();
}
示例4
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(1);
DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
new SimpleStringSchema(), kinesisProducerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("flink-test");
kinesis.setDefaultPartition("0");
simpleStringStream.addSink(kinesis);
see.execute();
}
示例5
static Properties forAwsRegionConsumerProps(AwsRegion awsRegion) {
final Properties properties = new Properties();
if (awsRegion.isDefault()) {
properties.setProperty(AWSConfigConstants.AWS_REGION, regionFromDefaultProviderChain());
} else if (awsRegion.isId()) {
properties.setProperty(AWSConfigConstants.AWS_REGION, awsRegion.asId().id());
} else if (awsRegion.isCustomEndpoint()) {
final AwsRegion.CustomEndpointAwsRegion customEndpoint = awsRegion.asCustomEndpoint();
properties.setProperty(AWSConfigConstants.AWS_ENDPOINT, customEndpoint.serviceEndpoint());
properties.setProperty(AWSConfigConstants.AWS_REGION, customEndpoint.regionId());
} else {
throw new IllegalStateException("Unrecognized AWS region configuration type: " + awsRegion);
}
return properties;
}
示例6
static Properties forAwsRegionProducerProps(AwsRegion awsRegion) {
final Properties properties = new Properties();
if (awsRegion.isDefault()) {
properties.setProperty(AWSConfigConstants.AWS_REGION, regionFromDefaultProviderChain());
} else if (awsRegion.isId()) {
properties.setProperty(AWSConfigConstants.AWS_REGION, awsRegion.asId().id());
} else if (awsRegion.isCustomEndpoint()) {
final AwsRegion.CustomEndpointAwsRegion customEndpoint = awsRegion.asCustomEndpoint();
final URI uri = URI.create(customEndpoint.serviceEndpoint());
properties.setProperty("KinesisEndpoint", uri.getHost());
properties.setProperty(AWSConfigConstants.AWS_REGION, customEndpoint.regionId());
int port = uri.getPort();
if (port != -1) {
properties.setProperty("KinesisPort", String.valueOf(port));
}
} else {
throw new IllegalStateException("Unrecognized AWS region configuration type: " + awsRegion);
}
return properties;
}
示例7
@Test
public void awsBasicCredentialsProperties() {
final Properties properties =
AwsAuthConfigProperties.forAwsCredentials(
AwsCredentials.basic("fake-access-key-id", "fake-secret-access-key"));
assertThat(properties.entrySet(), hasSize(3));
assertThat(
properties,
hasEntry(
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
AWSConfigConstants.CredentialProvider.BASIC.name()));
assertThat(
properties,
hasEntry(
AWSConfigConstants.accessKeyId(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
"fake-access-key-id"));
assertThat(
properties,
hasEntry(
AWSConfigConstants.secretKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
"fake-secret-access-key"));
}
示例8
@Test
public void awsProfileCredentialsProperties() {
final Properties properties =
AwsAuthConfigProperties.forAwsCredentials(
AwsCredentials.profile("fake-profile", "/fake/profile/path"));
assertThat(properties.entrySet(), hasSize(3));
assertThat(
properties,
hasEntry(
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
AWSConfigConstants.CredentialProvider.PROFILE.name()));
assertThat(
properties,
hasEntry(
AWSConfigConstants.profileName(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
"fake-profile"));
assertThat(
properties,
hasEntry(
AWSConfigConstants.profilePath(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER),
"/fake/profile/path"));
}
示例9
/**
* Creates an Amazon Kinesis Client.
* @param configProps configuration properties containing the access key, secret key, and region
* @param awsClientConfig preconfigured AWS SDK client configuration
* @return a new Amazon Kinesis Client
*/
public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
// set a Flink-specific user agent
awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
EnvironmentInformation.getVersion(),
EnvironmentInformation.getRevisionInformation().commitId));
// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
.withClientConfiguration(awsClientConfig);
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
// If an endpoint is specified, we give preference to using an endpoint and use the region property to
// sign the request.
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
configProps.getProperty(AWSConfigConstants.AWS_REGION)));
} else {
builder.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
}
return builder.build();
}
示例10
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(1);
DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
new SimpleStringSchema(), kinesisProducerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("flink-test");
kinesis.setDefaultPartition("0");
simpleStringStream.addSink(kinesis);
see.execute();
}
示例11
/**
* Validate configuration properties for {@link FlinkKinesisProducer},
* and return a constructed KinesisProducerConfiguration.
*/
public static KinesisProducerConfiguration getValidatedProducerConfiguration(Properties config) {
checkNotNull(config, "config can not be null");
validateAwsConfiguration(config);
if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
// per requirement in Amazon Kinesis Producer Library
throw new IllegalArgumentException(String.format("For FlinkKinesisProducer AWS region ('%s') must be set in the config.", AWSConfigConstants.AWS_REGION));
}
KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));
kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));
// we explicitly lower the credential refresh delay (default is 5 seconds)
// to avoid an ignorable interruption warning that occurs when shutting down the
// KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10.
kpc.setCredentialsRefreshDelay(100);
// Override default values if they aren't specified by users
if (!config.containsKey(RATE_LIMIT)) {
kpc.setRateLimit(DEFAULT_RATE_LIMIT);
}
if (!config.containsKey(THREADING_MODEL)) {
kpc.setThreadingModel(DEFAULT_THREADING_MODEL);
}
if (!config.containsKey(THREAD_POOL_SIZE)) {
kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
}
return kpc;
}
示例12
/**
* Get standard Kinesis-related config properties.
*/
public static Properties getStandardProperties() {
Properties config = new Properties();
config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
return config;
}
示例13
@Test
public void testClientConfigOverride() {
Properties configProps = new Properties();
configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
configProps.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX + "socketTimeout", "9999");
KinesisProxyInterface proxy = KinesisProxy.create(configProps);
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient,
"clientConfiguration");
assertEquals(9999, clientConfiguration.getSocketTimeout());
}
示例14
@Test
public void testUnparsableLongForProducerConfiguration() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'");
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty("RateLimit", "unparsableLong");
KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
}
示例15
@Test
public void testRateLimitInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(100, kpc.getRateLimit());
testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(150, kpc.getRateLimit());
}
示例16
@Test
public void testThreadingModelInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel());
testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST");
kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel());
}
示例17
@Test
public void testThreadPoolSizeInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(10, kpc.getThreadPoolSize());
testConfig.setProperty(KinesisConfigUtil.THREAD_POOL_SIZE, "12");
kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(12, kpc.getThreadPoolSize());
}
示例18
@Test
public void testReplaceDeprecatedKeys() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
// these deprecated keys should be replaced
testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1");
testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2");
Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig);
assertEquals("1", replacedConfig.getProperty(KinesisConfigUtil.AGGREGATION_MAX_COUNT));
assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT));
}
示例19
@Test
public void testCorrectlySetRegionInProducerConfiguration() {
String region = "us-east-1";
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals("incorrect region", region, kpc.getRegion());
}
示例20
@Test
public void testMissingAwsRegionInProducerConfig() {
String expectedMessage = String.format("For FlinkKinesisProducer AWS region ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(expectedMessage);
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
}
示例21
@Test
public void testUnrecognizableAwsRegionInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid AWS region");
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId");
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
示例22
@Test
public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
示例23
@Test
public void testUnrecognizableCredentialProviderTypeInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid AWS Credential Provider Type");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
示例24
@Test
public void testAwsRegionOrEndpointInConsumerConfig() {
String expectedMessage = String.format("For FlinkKinesisConsumer either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(expectedMessage);
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "fake");
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
示例25
/**
* Get standard Kinesis-related config properties.
*/
public static Properties getStandardProperties() {
Properties config = new Properties();
config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
return config;
}
示例26
/**
* Validate configuration properties for {@link FlinkKinesisProducer},
* and return a constructed KinesisProducerConfiguration.
*/
public static KinesisProducerConfiguration getValidatedProducerConfiguration(Properties config) {
checkNotNull(config, "config can not be null");
validateAwsConfiguration(config);
if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
// per requirement in Amazon Kinesis Producer Library
throw new IllegalArgumentException(String.format("For FlinkKinesisProducer AWS region ('%s') must be set in the config.", AWSConfigConstants.AWS_REGION));
}
KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));
kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));
// we explicitly lower the credential refresh delay (default is 5 seconds)
// to avoid an ignorable interruption warning that occurs when shutting down the
// KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10.
kpc.setCredentialsRefreshDelay(100);
// Override default values if they aren't specified by users
if (!config.containsKey(RATE_LIMIT)) {
kpc.setRateLimit(DEFAULT_RATE_LIMIT);
}
if (!config.containsKey(THREADING_MODEL)) {
kpc.setThreadingModel(DEFAULT_THREADING_MODEL);
}
if (!config.containsKey(THREAD_POOL_SIZE)) {
kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
}
return kpc;
}
示例27
@Test
public void testClientConfigOverride() {
Properties configProps = new Properties();
configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
configProps.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX + "socketTimeout", "9999");
KinesisProxyInterface proxy = KinesisProxy.create(configProps);
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient,
"clientConfiguration");
assertEquals(9999, clientConfiguration.getSocketTimeout());
}
示例28
@Test
public void testUnparsableLongForProducerConfiguration() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'");
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty("RateLimit", "unparsableLong");
KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
}
示例29
@Test
public void testRateLimitInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(100, kpc.getRateLimit());
testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(150, kpc.getRateLimit());
}
示例30
@Test
public void testThreadingModelInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel());
testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST");
kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel());
}