Java源码示例:com.amazonaws.services.sqs.model.GetQueueAttributesResult
示例1
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest request) {
List<String> attributeNames = request.getAttributeNames();
boolean includeHostQueue =
attributeNames.remove(VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE) ||
attributeNames.contains("All");
boolean includeRetentionPeriod = retentionPeriod.isPresent() &&
(attributeNames.contains(IDLE_QUEUE_RETENTION_PERIOD) ||
attributeNames.contains("All"));
GetQueueAttributesRequest hostQueueRequest = new GetQueueAttributesRequest()
.withQueueUrl(hostQueue.queueUrl)
.withAttributeNames(attributeNames);
GetQueueAttributesResult result = amazonSqsToBeExtended.getQueueAttributes(hostQueueRequest);
if (includeHostQueue) {
result.getAttributes().put(VIRTUAL_QUEUE_HOST_QUEUE_ATTRIBUTE, hostQueue.queueUrl);
}
if (includeRetentionPeriod) {
result.getAttributes().put(IDLE_QUEUE_RETENTION_PERIOD, retentionPeriod.get().toString());
}
return result;
}
示例2
@Test
public void supportsBothFeatures() {
AmazonSQS sqs = AmazonSQSTemporaryQueuesClientBuilder.standard()
.withQueuePrefix(queueNamePrefix)
.build();
String hostQueueUrl = sqs.createQueue(new CreateQueueRequest()
.withQueueName(queueNamePrefix + "Host")
.addAttributesEntry("IdleQueueRetentionPeriodSeconds", "300")).getQueueUrl();
GetQueueAttributesResult result = sqs.getQueueAttributes(hostQueueUrl, Collections.singletonList("IdleQueueRetentionPeriodSeconds"));
assertEquals("300", result.getAttributes().get("IdleQueueRetentionPeriodSeconds"));
String virtualQueueUrl = sqs.createQueue(new CreateQueueRequest()
.withQueueName(queueNamePrefix + "VirtualQueue")
.addAttributesEntry("HostQueueUrl", hostQueueUrl)
.addAttributesEntry("IdleQueueRetentionPeriodSeconds", "300")).getQueueUrl();
result = sqs.getQueueAttributes(virtualQueueUrl,
Arrays.asList("HostQueueUrl", "IdleQueueRetentionPeriodSeconds"));
assertEquals(hostQueueUrl, result.getAttributes().get("HostQueueUrl"));
assertEquals("300", result.getAttributes().get("IdleQueueRetentionPeriodSeconds"));
}
示例3
@Test
public void supportsTurningOffIdleQueueSweeping() {
AmazonSQS sqs = AmazonSQSTemporaryQueuesClientBuilder.standard()
.withQueuePrefix(queueNamePrefix)
.withIdleQueueSweepingPeriod(0, TimeUnit.MINUTES)
.build();
String hostQueueUrl = sqs.createQueue(new CreateQueueRequest()
.withQueueName(queueNamePrefix + "Host")
.addAttributesEntry("IdleQueueRetentionPeriodSeconds", "300")).getQueueUrl();
GetQueueAttributesResult result = sqs.getQueueAttributes(hostQueueUrl, Collections.singletonList("IdleQueueRetentionPeriodSeconds"));
assertEquals("300", result.getAttributes().get("IdleQueueRetentionPeriodSeconds"));
String virtualQueueUrl = sqs.createQueue(new CreateQueueRequest()
.withQueueName(queueNamePrefix + "VirtualQueue")
.addAttributesEntry("HostQueueUrl", hostQueueUrl)
.addAttributesEntry("IdleQueueRetentionPeriodSeconds", "300")).getQueueUrl();
result = sqs.getQueueAttributes(virtualQueueUrl,
Arrays.asList("HostQueueUrl", "IdleQueueRetentionPeriodSeconds"));
assertEquals(hostQueueUrl, result.getAttributes().get("HostQueueUrl"));
assertEquals("300", result.getAttributes().get("IdleQueueRetentionPeriodSeconds"));
}
示例4
public ReceiveQueueBufferTest() {
this.sqs = mock(AmazonSQS.class);
this.executor = mock(ScheduledExecutorService.class);
this.queueUrl = "http://queue.amazon.com/123456789012/MyQueue";
Map<String, String> attributes = new HashMap<>();
attributes.put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.toString(), "20");
attributes.put(QueueAttributeName.VisibilityTimeout.toString(), "30");
List<String> attributeNames = Arrays.asList(
QueueAttributeName.ReceiveMessageWaitTimeSeconds.toString(),
QueueAttributeName.VisibilityTimeout.toString());
GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult()
.withAttributes(attributes);
when(sqs.getQueueAttributes(
eq(new GetQueueAttributesRequest()
.withQueueUrl(queueUrl)
.withAttributeNames(attributeNames))))
.thenReturn(getQueueAttributesResult);
}
示例5
private void checkQueueLength() {
try {
GetQueueAttributesResult
result =
sqsClient.getQueueAttributes(queueUrl, Arrays.asList(QUEUELENGTHATTR,
QUEUEINVISIBLEATTR));
Map<String, String> attrs = result.getAttributes();
if (attrs.containsKey(QUEUELENGTHATTR)) {
Stats.addMetric(StatsUtil.getStatsName("healthcheck", "ec2queue_length"),
Integer.parseInt(attrs.get(QUEUELENGTHATTR)));
logger.info("Ec2 queue length is {}", attrs.get(QUEUELENGTHATTR));
}
if (attrs.containsKey(QUEUEINVISIBLEATTR)) {
Stats.addMetric(StatsUtil.getStatsName("healthcheck", "ec2queue_in_processing"),
Integer.parseInt(attrs.get("ApproximateNumberOfMessagesNotVisible")));
logger.info("Ec2 queue in processing length is {}", attrs.get(QUEUEINVISIBLEATTR));
}
} catch (Exception ex) {
logger.warn(ExceptionUtils.getRootCauseMessage(ex));
logger.warn(ExceptionUtils.getFullStackTrace(ex));
}
}
示例6
private void addPermissions() {
if (permissions != null && permissions.isEmpty() == false) {
GetQueueAttributesResult result = sqsClient
.getQueueAttributes(new GetQueueAttributesRequest(queueUrl,
Arrays.asList("Policy")));
AwsUtil.addPermissions(result.getAttributes(), permissions,
new AwsUtil.AddPermissionHandler() {
@Override
public void execute(Permission p) {
sqsClient.addPermission(new AddPermissionRequest()
.withQueueUrl(queueUrl)
.withLabel(p.getLabel())
.withAWSAccountIds(p.getAwsAccountIds())
.withActions(p.getActions()));
}
});
}
}
示例7
@Override
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest request) {
QueueMetadata metadata = queues.get(request.getQueueUrl());
if (metadata != null) {
Map<String, String> filteredAttributes = new HashMap<>(metadata.attributes);
filteredAttributes.keySet().retainAll(request.getAttributeNames());
return new GetQueueAttributesResult().withAttributes(filteredAttributes);
}
return super.getQueueAttributes(request);
}
示例8
public static boolean isQueueEmpty(AmazonSQS sqs, String queueUrl) {
QueueAttributeName[] messageCountAttrs = {
QueueAttributeName.ApproximateNumberOfMessages,
QueueAttributeName.ApproximateNumberOfMessagesDelayed,
QueueAttributeName.ApproximateNumberOfMessagesNotVisible
};
GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest()
.withQueueUrl(queueUrl)
.withAttributeNames(messageCountAttrs);
GetQueueAttributesResult result = sqs.getQueueAttributes(getQueueAttributesRequest);
Map<String, String> attrValues = result.getAttributes();
return Stream.of(messageCountAttrs).allMatch(attr ->
Long.parseLong(attrValues.get(attr.name())) == 0);
}
示例9
@Override
public Optional<MetaData> meta(Map<String, Object> parameters) {
final String accessKey = ConnectorOptions.extractOption(parameters, "accessKey");
final String secretKey = ConnectorOptions.extractOption(parameters, "secretKey");
final String region = ConnectorOptions.extractOption(parameters, "region");
AmazonSQSClientBuilder clientBuilder;
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
clientBuilder = AmazonSQSClientBuilder.standard().withCredentials(credentialsProvider);
clientBuilder = clientBuilder.withRegion(Regions.valueOf(region));
AmazonSQS sqsClient = clientBuilder.build();
List<String> attributeNames = new ArrayList<String>();
attributeNames.add("All");
try {
ListQueuesResult result = sqsClient.listQueues();
Set<String> setQueue = new HashSet<String>();
if (result.getQueueUrls() != null) {
for (String entry : result.getQueueUrls()) {
GetQueueAttributesRequest req = new GetQueueAttributesRequest();
req.setQueueUrl(entry);
req.setAttributeNames(attributeNames);
GetQueueAttributesResult c = sqsClient.getQueueAttributes(req);
setQueue.add(c.getAttributes().get(QueueAttributeName.QueueArn.name()));
}
}
return Optional.of(MetaDataBuilder.on(getCamelContext()).withAttribute(MetaData.CONTENT_TYPE, "text/plain").withAttribute(MetaData.JAVA_TYPE, String.class)
.withPayload(setQueue).build());
} catch (Exception e) {
throw new IllegalStateException("Get information about existing queues with has failed.", e);
}
}
示例10
@Override
public long size() {
GetQueueAttributesResult attributes = client.getQueueAttributes(queueURL, Collections.singletonList("ApproximateNumberOfMessages"));
String sizeAsStr = attributes.getAttributes().get("ApproximateNumberOfMessages");
try {
return Long.parseLong(sizeAsStr);
} catch(Exception e) {
return -1;
}
}
示例11
/**
* For retrieving vault inventory. For initializing SQS for determining when
* job completed. Does nothing if member snsTopicName is null. Sets members
* sqsQueueURL, sqsQueueARN, and sqsClient.
*/
private void setupSQS() {
// If no sqsQueueName setup then simply return
if (sqsQueueName == null)
return;
CreateQueueRequest request = new CreateQueueRequest()
.withQueueName(sqsQueueName);
CreateQueueResult result = sqsClient.createQueue(request);
sqsQueueURL = result.getQueueUrl();
GetQueueAttributesRequest qRequest = new GetQueueAttributesRequest()
.withQueueUrl(sqsQueueURL).withAttributeNames("QueueArn");
GetQueueAttributesResult qResult = sqsClient
.getQueueAttributes(qRequest);
sqsQueueARN = qResult.getAttributes().get("QueueArn");
Policy sqsPolicy = new Policy().withStatements(new Statement(
Effect.Allow).withPrincipals(Principal.AllUsers)
.withActions(SQSActions.SendMessage)
.withResources(new Resource(sqsQueueARN)));
Map<String, String> queueAttributes = new HashMap<String, String>();
queueAttributes.put("Policy", sqsPolicy.toJson());
sqsClient.setQueueAttributes(new SetQueueAttributesRequest(sqsQueueURL,
queueAttributes));
}
示例12
private static void mockGetQueueAttributesWithRedrivePolicy(AmazonSQSAsync sqs,
String queueUrl) {
when(sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl)
.withAttributeNames(QueueAttributeName.RedrivePolicy)))
.thenReturn(new GetQueueAttributesResult().addAttributesEntry(
QueueAttributeName.RedrivePolicy.toString(),
"{\"some\": \"JSON\"}"));
}
示例13
@Test
void testWithDefaultTaskExecutorAndOneHandler() throws Exception {
int testedMaxNumberOfMessages = 10;
Map<QueueMessageHandler.MappingInformation, HandlerMethod> messageHandlerMethods = Collections
.singletonMap(new QueueMessageHandler.MappingInformation(
Collections.singleton("testQueue"),
SqsMessageDeletionPolicy.ALWAYS), null);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
QueueMessageHandler mockedHandler = mock(QueueMessageHandler.class);
AmazonSQSAsync mockedSqs = mock(AmazonSQSAsync.class, withSettings().stubOnly());
when(mockedSqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(new GetQueueAttributesResult());
when(mockedSqs.getQueueUrl(any(GetQueueUrlRequest.class)))
.thenReturn(new GetQueueUrlResult().withQueueUrl("testQueueUrl"));
when(mockedHandler.getHandlerMethods()).thenReturn(messageHandlerMethods);
container.setMaxNumberOfMessages(testedMaxNumberOfMessages);
container.setAmazonSqs(mockedSqs);
container.setMessageHandler(mockedHandler);
container.afterPropertiesSet();
int expectedPoolMaxSize = messageHandlerMethods.size()
* (testedMaxNumberOfMessages + 1);
ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) container
.getTaskExecutor();
assertThat(taskExecutor).isNotNull();
assertThat(taskExecutor.getMaxPoolSize()).isEqualTo(expectedPoolMaxSize);
}
示例14
@Bean
AmazonSQSAsync amazonSQS() {
AmazonSQSAsync mockAmazonSQS = mock(AmazonSQSAsync.class,
withSettings().stubOnly());
mockGetQueueUrl(mockAmazonSQS, "testQueue", "http://testQueue.amazonaws.com");
when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class)))
.thenReturn(new ReceiveMessageResult());
when(mockAmazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(new GetQueueAttributesResult());
return mockAmazonSQS;
}
示例15
@Test
void receiveMessageRequests_withOneElement_created() throws Exception {
AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer();
AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly());
QueueMessageHandler messageHandler = new QueueMessageHandler();
container.setAmazonSqs(mock);
container.setMessageHandler(mock(QueueMessageHandler.class));
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("messageListener", MessageListener.class);
container.setMaxNumberOfMessages(11);
container.setVisibilityTimeout(22);
container.setWaitTimeOut(33);
messageHandler.setApplicationContext(applicationContext);
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue")))
.thenReturn(new GetQueueUrlResult()
.withQueueUrl("http://testQueue.amazonaws.com"));
when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(new GetQueueAttributesResult());
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
container.start();
Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues();
assertThat(registeredQueues.get("testQueue").getReceiveMessageRequest()
.getQueueUrl()).isEqualTo("http://testQueue.amazonaws.com");
assertThat(registeredQueues.get("testQueue").getReceiveMessageRequest()
.getMaxNumberOfMessages().longValue()).isEqualTo(11L);
assertThat(registeredQueues.get("testQueue").getReceiveMessageRequest()
.getVisibilityTimeout().longValue()).isEqualTo(22L);
assertThat(registeredQueues.get("testQueue").getReceiveMessageRequest()
.getWaitTimeSeconds().longValue()).isEqualTo(33L);
}
示例16
private void resolveQueueArn() {
GetQueueAttributesRequest request = new GetQueueAttributesRequest(
queueUrl);
GetQueueAttributesResult result = sqsClient.getQueueAttributes(request
.withAttributeNames(Collections.singletonList(QUEUE_ARN_KEY)));
queueArn = result.getAttributes().get(QUEUE_ARN_KEY);
}
示例17
@Override
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest request) {
return getVirtualQueue(request.getQueueUrl())
.map(virtualQueue -> virtualQueue.getQueueAttributes(request))
.orElseGet(() -> amazonSqsToBeExtended.getQueueAttributes(request));
}
示例18
@Override
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest request) {
request.getRequestClientOptions().appendUserAgent(userAgent);
return amazonSqsToBeExtended.getQueueAttributes(request);
}
示例19
@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
try {
if (sqs == null) {
String region = new DefaultAwsRegionProviderChain().getRegion();
sqs = AmazonSQSClientBuilder
.standard()
.withRegion(region)
.build();
logger.info("AmazonSQS client is connected to region: ({})", region);
}
List<String> queueUrls;
// check for manually-specified queue name filters
String queueUrlsFromEnv = System.getenv("SQS_QUEUE_URLS");
String queueNames = System.getenv("SQS_QUEUE_NAMES");
String queueNamePrefix = System.getenv("SQS_QUEUE_NAME_PREFIX");
if (queueUrlsFromEnv != null) {
String[] urls = queueUrlsFromEnv.split(",");
queueUrls = new ArrayList<String>();
for(String url : urls) {
queueUrls.add(url);
}
} else if (queueNames != null) {
// find the URLs for the named queues
String[] names = queueNames.split(",");
queueUrls = new ArrayList<String>();
for(String name : names) {
queueUrls.add(sqs.getQueueUrl(name).getQueueUrl());
}
} else {
// get URLs for all queues visible to this account (with prefix if specified)
ListQueuesResult queues = sqs.listQueues(queueNamePrefix); //If null is passed in the whole unfiltered list is returned
queueUrls = queues.getQueueUrls();
}
for (String qUrl : queueUrls) {
String[] tokens = qUrl.split("\\/");
String queueName = tokens[tokens.length - 1];
GetQueueAttributesResult attr = sqs.getQueueAttributes(qUrl, attributeNames);
Map<String, String> qAttributes = attr.getAttributes();
for (String key : qAttributes.keySet()) {
GaugeMetricFamily labeledGauge = new GaugeMetricFamily(
String.format("sqs_%s", key.toLowerCase().trim()),
attributeDescriptions.get(key),
Arrays.asList("queue"));
labeledGauge.addMetric(Arrays.asList(queueName),
Double.valueOf(qAttributes.get(key)));
mfs.add(labeledGauge);
}
}
} catch (AmazonClientException e) {
logger.error(e.getMessage());
if (sqs != null)
sqs.shutdown();
sqs = null; // force reconnect
}
return mfs;
}
示例20
private String getQueueARN() {
GetQueueAttributesResult response = client.getQueueAttributes(queueURL, Collections.singletonList("QueueArn"));
return response.getAttributes().get("QueueArn");
}
示例21
private static void mockGetQueueAttributesWithEmptyResult(AmazonSQSAsync sqs,
String queueUrl) {
when(sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl)
.withAttributeNames(QueueAttributeName.RedrivePolicy)))
.thenReturn(new GetQueueAttributesResult());
}
示例22
@Test
void testSimpleReceiveMessage() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly());
container.setAmazonSqs(sqs);
CountDownLatch countDownLatch = new CountDownLatch(1);
QueueMessageHandler messageHandler = new QueueMessageHandler() {
@Override
public void handleMessage(org.springframework.messaging.Message<?> message)
throws MessagingException {
countDownLatch.countDown();
assertThat(message.getPayload()).isEqualTo("messageContent");
}
};
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testMessageListener",
TestMessageListener.class);
messageHandler.setApplicationContext(applicationContext);
container.setBeanName("testContainerName");
messageHandler.afterPropertiesSet();
mockGetQueueUrl(sqs, "testQueue",
"http://testSimpleReceiveMessage.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,
"http://testSimpleReceiveMessage.amazonaws.com");
container.afterPropertiesSet();
when(sqs.receiveMessage(
new ReceiveMessageRequest("http://testSimpleReceiveMessage.amazonaws.com")
.withAttributeNames("All").withMessageAttributeNames("All")
.withMaxNumberOfMessages(10).withWaitTimeSeconds(20)))
.thenReturn(new ReceiveMessageResult().withMessages(
new Message().withBody("messageContent"),
new Message().withBody("messageContent")))
.thenReturn(new ReceiveMessageResult());
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(new GetQueueAttributesResult());
container.start();
assertThat(countDownLatch.await(1, TimeUnit.SECONDS)).isTrue();
container.stop();
}
示例23
@Test
void listener_withMultipleMessageHandlers_shouldBeCalled() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() {
@Override
protected void executeMessage(
org.springframework.messaging.Message<String> stringMessage) {
countDownLatch.countDown();
super.executeMessage(stringMessage);
}
};
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly());
container.setAmazonSqs(sqs);
QueueMessageHandler messageHandler = new QueueMessageHandler();
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testMessageListener",
TestMessageListener.class);
applicationContext.registerSingleton("anotherTestMessageListener",
AnotherTestMessageListener.class);
mockGetQueueUrl(sqs, "testQueue",
"https://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,
"https://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com");
mockGetQueueUrl(sqs, "anotherTestQueue",
"https://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,
"https://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com");
messageHandler.setApplicationContext(applicationContext);
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
when(sqs.receiveMessage(new ReceiveMessageRequest(
"https://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com")
.withAttributeNames("All").withMessageAttributeNames("All")
.withMaxNumberOfMessages(10).withWaitTimeSeconds(20)))
.thenReturn(new ReceiveMessageResult().withMessages(
new Message().withBody("messageContent")))
.thenReturn(new ReceiveMessageResult());
when(sqs.receiveMessage(new ReceiveMessageRequest(
"https://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com")
.withAttributeNames("All").withMessageAttributeNames("All")
.withMaxNumberOfMessages(10).withWaitTimeSeconds(20)))
.thenReturn(new ReceiveMessageResult().withMessages(
new Message().withBody("anotherMessageContent")))
.thenReturn(new ReceiveMessageResult());
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(new GetQueueAttributesResult());
container.start();
assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
container.stop();
assertThat(applicationContext.getBean(TestMessageListener.class).getMessage())
.isEqualTo("messageContent");
assertThat(
applicationContext.getBean(AnotherTestMessageListener.class).getMessage())
.isEqualTo("anotherMessageContent");
}
示例24
@Test
void messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders()
throws Exception {
// Arrange
CountDownLatch countDownLatch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() {
@Override
protected void executeMessage(
org.springframework.messaging.Message<String> stringMessage) {
countDownLatch.countDown();
super.executeMessage(stringMessage);
}
};
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly());
container.setAmazonSqs(sqs);
QueueMessageHandler messageHandler = spy(new QueueMessageHandler());
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testMessageListener",
TestMessageListener.class);
mockGetQueueUrl(sqs, "testQueue",
"https://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,
"https://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com");
messageHandler.setApplicationContext(applicationContext);
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
when(sqs.receiveMessage(new ReceiveMessageRequest(
"https://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com")
.withAttributeNames("All").withMessageAttributeNames("All")
.withMaxNumberOfMessages(10).withWaitTimeSeconds(20))).thenReturn(
new ReceiveMessageResult().withMessages(new Message()
.withBody("messageContent")
.withAttributes(new HashMap<String, String>() {
{
put("SenderId", "ID");
put("SentTimestamp", "1000");
put("ApproximateFirstReceiveTimestamp",
"2000");
}
})))
.thenReturn(new ReceiveMessageResult());
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(new GetQueueAttributesResult());
// Act
container.start();
// Assert
assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
container.stop();
verify(messageHandler).handleMessage(this.stringMessageCaptor.capture());
assertThat(this.stringMessageCaptor.getValue().getHeaders().get("SenderId"))
.isEqualTo("ID");
assertThat(this.stringMessageCaptor.getValue().getHeaders().getTimestamp())
.isEqualTo(1000L);
assertThat(this.stringMessageCaptor.getValue().getHeaders()
.get("ApproximateFirstReceiveTimestamp")).isEqualTo("2000");
}
示例25
@Test
void messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader()
throws Exception {
// Arrange
CountDownLatch countDownLatch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() {
@Override
protected void executeMessage(
org.springframework.messaging.Message<String> stringMessage) {
countDownLatch.countDown();
super.executeMessage(stringMessage);
}
};
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly());
container.setAmazonSqs(sqs);
QueueMessageHandler messageHandler = spy(new QueueMessageHandler());
container.setMessageHandler(messageHandler);
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testMessageListener",
TestMessageListener.class);
mockGetQueueUrl(sqs, "testQueue",
"https://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs,
"https://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com");
messageHandler.setApplicationContext(applicationContext);
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
MimeType mimeType = new MimeType("text", "plain", Charset.forName("UTF-8"));
when(sqs.receiveMessage(new ReceiveMessageRequest(
"https://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com")
.withAttributeNames("All").withMessageAttributeNames("All")
.withMaxNumberOfMessages(10).withWaitTimeSeconds(20))).thenReturn(
new ReceiveMessageResult().withMessages(new Message()
.withBody("messageContent")
.withAttributes(Collections
.singletonMap("SenderId", "ID"))
.withMessageAttributes(Collections
.singletonMap(MessageHeaders.CONTENT_TYPE,
new MessageAttributeValue()
.withDataType("String")
.withStringValue(mimeType
.toString())))))
.thenReturn(new ReceiveMessageResult());
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(new GetQueueAttributesResult());
// Act
container.start();
// Assert
assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
container.stop();
verify(messageHandler).handleMessage(this.stringMessageCaptor.capture());
assertThat(this.stringMessageCaptor.getValue().getHeaders()
.get(MessageHeaders.CONTENT_TYPE)).isEqualTo(mimeType);
}
示例26
@Test
void receiveMessage_throwsAnException_operationShouldBeRetried() throws Exception {
// Arrange
Level previous = disableLogging();
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class, withSettings().stubOnly());
when(amazonSqs.receiveMessage(any(ReceiveMessageRequest.class)))
.thenThrow(new RuntimeException("Boom!"))
.thenReturn(new ReceiveMessageResult().withMessages(
new Message().withBody("messageContent"),
new Message().withBody("messageContent")));
CountDownLatch countDownLatch = new CountDownLatch(1);
QueueMessageHandler messageHandler = new QueueMessageHandler() {
@Override
public void handleMessage(org.springframework.messaging.Message<?> message)
throws MessagingException {
countDownLatch.countDown();
assertThat(message.getPayload()).isEqualTo("messageContent");
}
};
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testMessageListener",
TestMessageListener.class);
messageHandler.setApplicationContext(applicationContext);
mockGetQueueUrl(amazonSqs, "testQueue",
"https://receiveMessage_throwsAnException_operationShouldBeRetried.amazonaws.com");
messageHandler.afterPropertiesSet();
when(amazonSqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(new GetQueueAttributesResult());
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setBackOffTime(0);
container.setAmazonSqs(amazonSqs);
container.setMessageHandler(messageHandler);
container.setAutoStartup(false);
container.afterPropertiesSet();
// Act
container.start();
// Assert
assertThat(countDownLatch.await(1, TimeUnit.SECONDS)).isTrue();
container.stop();
setLogLevel(previous);
}
示例27
@Override
public GetQueueAttributesResult getQueueAttributes(
GetQueueAttributesRequest getQueueAttributesRequest)
throws AmazonClientException {
return new GetQueueAttributesResult();
}
示例28
@Test
void receiveMessageRequests_withMultipleElements_created() throws Exception {
AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer();
AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly());
container.setAmazonSqs(mock);
StaticApplicationContext applicationContext = new StaticApplicationContext();
QueueMessageHandler messageHandler = new QueueMessageHandler();
messageHandler.setApplicationContext(applicationContext);
container.setMessageHandler(messageHandler);
applicationContext.registerSingleton("messageListener", MessageListener.class);
applicationContext.registerSingleton("anotherMessageListener",
AnotherMessageListener.class);
container.setMaxNumberOfMessages(11);
container.setVisibilityTimeout(22);
container.setWaitTimeOut(33);
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue")))
.thenReturn(new GetQueueUrlResult()
.withQueueUrl("http://testQueue.amazonaws.com"));
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue")))
.thenReturn(new GetQueueUrlResult()
.withQueueUrl("https://anotherTestQueue.amazonaws.com"));
when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(new GetQueueAttributesResult());
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
container.start();
Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues();
assertThat(registeredQueues.get("testQueue").getReceiveMessageRequest()
.getQueueUrl()).isEqualTo("http://testQueue.amazonaws.com");
assertThat(registeredQueues.get("testQueue").getReceiveMessageRequest()
.getMaxNumberOfMessages().longValue()).isEqualTo(11L);
assertThat(registeredQueues.get("testQueue").getReceiveMessageRequest()
.getVisibilityTimeout().longValue()).isEqualTo(22L);
assertThat(registeredQueues.get("testQueue").getReceiveMessageRequest()
.getWaitTimeSeconds().longValue()).isEqualTo(33L);
assertThat(registeredQueues.get("anotherTestQueue").getReceiveMessageRequest()
.getQueueUrl()).isEqualTo("https://anotherTestQueue.amazonaws.com");
assertThat(registeredQueues.get("anotherTestQueue").getReceiveMessageRequest()
.getMaxNumberOfMessages().longValue()).isEqualTo(11L);
assertThat(registeredQueues.get("anotherTestQueue").getReceiveMessageRequest()
.getVisibilityTimeout().longValue()).isEqualTo(22L);
assertThat(registeredQueues.get("anotherTestQueue").getReceiveMessageRequest()
.getWaitTimeSeconds().longValue()).isEqualTo(33L);
}
示例29
@Test
void receiveMessageRequests_withDestinationResolverThrowingException_shouldLogWarningAndNotCreateRequest()
throws Exception {
// Arrange
AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer();
Logger loggerMock = container.getLogger();
AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly());
container.setAmazonSqs(mock);
StaticApplicationContext applicationContext = new StaticApplicationContext();
QueueMessageHandler messageHandler = new QueueMessageHandler();
messageHandler.setApplicationContext(applicationContext);
container.setMessageHandler(messageHandler);
applicationContext.registerSingleton("messageListener", MessageListener.class);
applicationContext.registerSingleton("anotherMessageListener",
AnotherMessageListener.class);
String destinationResolutionExceptionMessage = "Queue not found";
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue")))
.thenThrow(new DestinationResolutionException(
destinationResolutionExceptionMessage));
when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue")))
.thenReturn(new GetQueueUrlResult()
.withQueueUrl("https://anotherTestQueue.amazonaws.com"));
when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(new GetQueueAttributesResult());
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();
// Act
container.start();
// Assert
ArgumentCaptor<String> logMsgArgCaptor = ArgumentCaptor.forClass(String.class);
verify(loggerMock).warn(logMsgArgCaptor.capture());
Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues();
assertThat(registeredQueues.containsKey("testQueue")).isFalse();
assertThat(logMsgArgCaptor.getValue())
.isEqualTo("Ignoring queue with name 'testQueue': "
+ destinationResolutionExceptionMessage);
assertThat(registeredQueues.get("anotherTestQueue").getReceiveMessageRequest()
.getQueueUrl()).isEqualTo("https://anotherTestQueue.amazonaws.com");
}
示例30
@Override
public boolean load(GetQueueAttributesRequest request,
ResultCapture<GetQueueAttributesResult> extractor) {
return resource.load(request, extractor);
}