From ec0677ba5ab95886f79bba8adfd9dfb6920de6f7 Mon Sep 17 00:00:00 2001 From: Robin Salkeld Date: Thu, 10 Aug 2017 12:25:09 -0700 Subject: [PATCH] 'Version 1.0.4 of the Amazon SQS Java Messaging Library' --- README.md | 2 +- pom.xml | 2 +- .../sqs/javamessaging/PrefetchManager.java | 6 + .../javamessaging/ProviderConfiguration.java | 4 +- .../sqs/javamessaging/SQSConnection.java | 6 +- .../SQSMessageConsumerPrefetch.java | 109 ++++++++-- .../sqs/javamessaging/SQSMessageProducer.java | 55 +++-- .../SQSMessagingClientConstants.java | 2 + .../SQSSessionCallbackScheduler.java | 6 +- .../sqs/javamessaging/message/SQSMessage.java | 10 +- .../SQSMessageConsumerPrefetchFifoTest.java | 32 ++- .../SQSMessageConsumerPrefetchTest.java | 192 +++++++++++++----- .../sqs/javamessaging/SQSSessionTest.java | 4 + 13 files changed, 335 insertions(+), 95 deletions(-) diff --git a/README.md b/README.md index 7d3481b..9f37666 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ for communicating with Amazon Simple Queue Service. This project builds on top o com.amazonaws amazon-sqs-java-messaging-lib - 1.0.3 + 1.0.4 jar ``` diff --git a/pom.xml b/pom.xml index 9995f14..3e6449e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.amazonaws amazon-sqs-java-messaging-lib - 1.0.3 + 1.0.4 jar Amazon SQS Java Messaging Library The Amazon SQS Java Messaging Library holds the Java Message Service compatible classes, that are used diff --git a/src/main/java/com/amazon/sqs/javamessaging/PrefetchManager.java b/src/main/java/com/amazon/sqs/javamessaging/PrefetchManager.java index e359af8..0964585 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/PrefetchManager.java +++ b/src/main/java/com/amazon/sqs/javamessaging/PrefetchManager.java @@ -27,6 +27,12 @@ public interface PrefetchManager { */ public void messageDispatched(); + /** + * Notify the prefetchThread that the message listener has finished with any + * previous message and is ready to accept another. + */ + public void messageListenerReady(); + /** * This is used to determine the state of the consumer, when the message * listener scheduler is processing the messages. diff --git a/src/main/java/com/amazon/sqs/javamessaging/ProviderConfiguration.java b/src/main/java/com/amazon/sqs/javamessaging/ProviderConfiguration.java index 283c274..284b003 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/ProviderConfiguration.java +++ b/src/main/java/com/amazon/sqs/javamessaging/ProviderConfiguration.java @@ -27,8 +27,8 @@ public int getNumberOfMessagesToPrefetch() { } public void setNumberOfMessagesToPrefetch(int numberOfMessagesToPrefetch) { - if (numberOfMessagesToPrefetch < SQSMessagingClientConstants.MIN_BATCH) { - throw new IllegalArgumentException(String.format("Invalid prefetch size. Provided value '%1$s' cannot be smaller than '%2$s'", numberOfMessagesToPrefetch, SQSMessagingClientConstants.MIN_BATCH)); + if (numberOfMessagesToPrefetch < SQSMessagingClientConstants.MIN_PREFETCH) { + throw new IllegalArgumentException(String.format("Invalid prefetch size. Provided value '%1$s' cannot be smaller than '%2$s'", numberOfMessagesToPrefetch, SQSMessagingClientConstants.MIN_PREFETCH)); } this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch; } diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java b/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java index aac88a1..38114cf 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java @@ -18,11 +18,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import javax.jms.IllegalStateException; - import javax.jms.Connection; - import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; import javax.jms.Destination; @@ -93,7 +90,8 @@ public class SQSConnection implements Connection, QueueConnection { /** * Configures the amount of messages that can be prefetched by a consumer. A - * single consumer cannot prefetch more than 10 messages. + * single consumer cannot prefetch more than 10 messages in a single call to SQS, + * but it will make multiple calls as necessary. */ private final int numberOfMessagesToPrefetch; private volatile boolean closed = false; diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java index dd30e9a..d5b222b 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java @@ -14,6 +14,7 @@ */ package com.amazon.sqs.javamessaging; +import java.net.URI; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Iterator; @@ -21,6 +22,7 @@ import java.util.Set; import java.util.UUID; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageListener; @@ -92,7 +94,14 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager { * Counter on how many messages are prefetched into internal messageQueue. */ protected int messagesPrefetched = 0; - + + /** + * Counter on how many messages have been explicitly requested. + * TODO: Consider renaming this class and several other variables now that + * this logic factors in message requests as well as prefetching. + */ + protected int messagesRequested = 0; + /** * States of the prefetch thread */ @@ -163,9 +172,24 @@ protected void setMessageListener(MessageListener messageListener) { List allPrefetchedMessages = new ArrayList(messageQueue); sqsSessionRunnable.scheduleCallBacks(messageListener, allPrefetchedMessages); messageQueue.clear(); + + // This will request the first message if necessary. + // TODO: This may overfetch if setMessageListener is being called multiple + // times, as the session callback scheduler may already have entries for this consumer. + messageListenerReady(); } } + /** + * Determine the number of messages we should attempt to fetch from SQS. + * Returns the difference between the number of messages needed (either for + * prefetching or by request) and the number currently fetched. + */ + private int numberOfMessagesToFetch() { + int numberOfMessagesNeeded = Math.max(numberOfMessagesToPrefetch, messagesRequested); + return Math.max(numberOfMessagesNeeded - messagesPrefetched, 0); + } + /** * Runs until the message consumer is closed and in-progress SQS * receiveMessage call returns. @@ -190,8 +214,7 @@ public void run() { synchronized (stateLock) { waitForStart(); waitForPrefetch(); - prefetchBatchSize = Math.min( - (numberOfMessagesToPrefetch - messagesPrefetched), SQSMessagingClientConstants.MAX_BATCH); + prefetchBatchSize = Math.min(numberOfMessagesToFetch(), SQSMessagingClientConstants.MAX_BATCH); } if (!isClosed()) { @@ -290,7 +313,7 @@ protected void processReceivedMessages(List messages) { protected void waitForPrefetch() throws InterruptedException { synchronized (stateLock) { - while (messagesPrefetched >= numberOfMessagesToPrefetch && !isClosed()) { + while (numberOfMessagesToFetch() <= 0 && !isClosed()) { try { stateLock.wait(); } catch (InterruptedException e) { @@ -332,7 +355,20 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep throw new JMSException("Not a supported JMS message type"); } } + jmsMessage.setJMSDestination(sqsDestination); + + MessageAttributeValue replyToQueueNameAttribute = message.getMessageAttributes().get( + SQSMessage.JMS_SQS_REPLY_TO_QUEUE_NAME); + MessageAttributeValue replyToQueueUrlAttribute = message.getMessageAttributes().get( + SQSMessage.JMS_SQS_REPLY_TO_QUEUE_URL); + if (replyToQueueNameAttribute != null && replyToQueueUrlAttribute != null) { + String replyToQueueUrl = replyToQueueUrlAttribute.getStringValue(); + String replyToQueueName = replyToQueueNameAttribute.getStringValue(); + Destination replyToQueue = new SQSQueueDestination(replyToQueueName, replyToQueueUrl); + jmsMessage.setJMSReplyTo(replyToQueue); + } + return jmsMessage; } @@ -366,12 +402,38 @@ protected void waitForStart() throws InterruptedException { public void messageDispatched() { synchronized (stateLock) { messagesPrefetched--; - if (messagesPrefetched < numberOfMessagesToPrefetch) { + messagesRequested--; + if (numberOfMessagesToFetch() > 0) { notifyStateChange(); } } } + @Override + public void messageListenerReady() { + synchronized (stateLock) { + // messagesRequested may still be more than zero if there were pending receive() + // calls when the message listener was set. + if (messagesRequested <= 0 && !isClosed() && messageListener != null) { + requestMessage(); + } + } + } + + void requestMessage() { + synchronized (stateLock) { + messagesRequested++; + notifyStateChange(); + } + } + + private void unrequestMessage() { + synchronized (stateLock) { + messagesRequested--; + notifyStateChange(); + } + } + public static class MessageManager { private final PrefetchManager prefetchManager; @@ -405,28 +467,35 @@ javax.jms.Message receive(long timeout) throws JMSException { timeout = 0; } - MessageManager messageManager; + MessageManager messageManager = null; synchronized (stateLock) { // If message exists in queue poll. if (!messageQueue.isEmpty()) { messageManager = messageQueue.pollFirst(); } else { - long startTime = System.currentTimeMillis(); - - long waitTime = 0; - while (messageQueue.isEmpty() && !isClosed() && - (timeout == 0 || (waitTime = getWaitTime(timeout, startTime)) > 0)) { - try { - stateLock.wait(waitTime); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + requestMessage(); + try { + long startTime = System.currentTimeMillis(); + + long waitTime = 0; + while (messageQueue.isEmpty() && !isClosed() && + (timeout == 0 || (waitTime = getWaitTime(timeout, startTime)) > 0)) { + try { + stateLock.wait(waitTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + if (messageQueue.isEmpty() || isClosed()) { return null; } - } - if (messageQueue.isEmpty() || isClosed()) { - return null; - } - messageManager = messageQueue.pollFirst(); + messageManager = messageQueue.pollFirst(); + } finally { + if (messageManager == null) { + unrequestMessage(); + } + } } } return messageHandler(messageManager); diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java index bb69741..e60ac7f 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java @@ -34,13 +34,13 @@ import com.amazon.sqs.javamessaging.message.SQSBytesMessage; import com.amazon.sqs.javamessaging.message.SQSMessage; +import com.amazon.sqs.javamessaging.message.SQSMessage.JMSMessagePropertyValue; import com.amazon.sqs.javamessaging.message.SQSObjectMessage; import com.amazon.sqs.javamessaging.message.SQSTextMessage; -import com.amazon.sqs.javamessaging.message.SQSMessage.JMSMessagePropertyValue; -import com.amazonaws.util.Base64; import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; +import com.amazonaws.util.Base64; /** * A client uses a MessageProducer object to send messages to a queue @@ -109,7 +109,15 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep throw new JMSException("Message body cannot be null or empty"); } Map messageAttributes = propertyToMessageAttribute((SQSMessage) message); - addMessageTypeReservedAttribute(messageAttributes, (SQSMessage) message, messageType); + + /** + * These will override existing attributes if they exist. Everything that + * has prefix JMS_ is reserved for JMS Provider, but if the user sets that + * attribute, it will be overwritten. + */ + addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_MESSAGE_TYPE, messageType); + addReplyToQueueReservedAttributes(messageAttributes, message); + SendMessageRequest sendMessageRequest = new SendMessageRequest(queue.getQueueUrl(), sqsMessageBody); sendMessageRequest.setMessageAttributes(messageAttributes); @@ -218,19 +226,42 @@ Map propertyToMessageAttribute(SQSMessage message private void addMessageTypeReservedAttribute(Map messageAttributes, SQSMessage message, String value) throws JMSException { - MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); - messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING); - messageAttributeValue.setStringValue(value); + addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_MESSAGE_TYPE, value); + } - /** - * This will override the existing attribute if exists. Everything that - * has prefix JMS_ is reserved for JMS Provider, but if the user sets that - * attribute, it will be overwritten. - */ - messageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE, messageAttributeValue); + /** + * Adds the reply-to queue name and url attributes during send as part of the send message + * request, if necessary + */ + private void addReplyToQueueReservedAttributes(Map messageAttributes, + SQSMessage message) throws JMSException { + + Destination replyTo = message.getJMSReplyTo(); + if (replyTo instanceof SQSQueueDestination) { + SQSQueueDestination replyToQueue = (SQSQueueDestination)replyTo; + + /** + * This will override the existing attributes if exists. Everything that + * has prefix JMS_ is reserved for JMS Provider, but if the user sets that + * attribute, it will be overwritten. + */ + addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_REPLY_TO_QUEUE_NAME, replyToQueue.getQueueName()); + addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_REPLY_TO_QUEUE_URL, replyToQueue.getQueueUrl()); + } } + /** + * Convenience method for adding a single string attribute. + */ + private void addStringAttribute(Map messageAttributes, + String key, String value) { + MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); + messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING); + messageAttributeValue.setStringValue(value); + messageAttributes.put(key, messageAttributeValue); + } + /** * Sends a message to a queue. *

diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessagingClientConstants.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessagingClientConstants.java index b02db22..303a0f9 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessagingClientConstants.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessagingClientConstants.java @@ -27,6 +27,8 @@ public class SQSMessagingClientConstants { public static final int MAX_BATCH = 10; public static final int MIN_BATCH = 1; + + public static final int MIN_PREFETCH = 0; /** * JMSMessage available user property types, which are mapped to message diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java b/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java index 4c74194..9a804ad 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.java @@ -182,6 +182,10 @@ public void run() { } } finally { session.finishedCallback(); + + // Let the prefetch manager know we're available to + // process another message (if there is a still a listener attached). + messageManager.getPrefetchManager().messageListenerReady(); } } catch (Throwable ex) { LOG.error("Unexpected exception thrown during the run of the scheduled callback", ex); @@ -211,7 +215,7 @@ void scheduleCallBacks(MessageListener messageListener, List mes } } } - + void nackQueuedMessages() { synchronized (callbackQueue) { try { diff --git a/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java b/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java index 7b1c300..4609c8b 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java +++ b/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java @@ -31,6 +31,7 @@ import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch; import com.amazon.sqs.javamessaging.SQSMessagingClientConstants; +import com.amazon.sqs.javamessaging.SQSQueueDestination; import com.amazon.sqs.javamessaging.acknowledge.Acknowledger; import com.amazonaws.services.sqs.model.MessageAttributeValue; @@ -66,6 +67,8 @@ public class SQSMessage implements Message { public static final String OBJECT_MESSAGE_TYPE = "object"; public static final String TEXT_MESSAGE_TYPE = "text"; public static final String JMS_SQS_MESSAGE_TYPE = "JMS_SQSMessageType"; + public static final String JMS_SQS_REPLY_TO_QUEUE_NAME = "JMS_SQSReplyToQueueName"; + public static final String JMS_SQS_REPLY_TO_QUEUE_URL = "JMS_SQSReplyToQueueURL"; // Default JMS Message properties private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; @@ -76,7 +79,7 @@ public class SQSMessage implements Message { private long expiration = Message.DEFAULT_TIME_TO_LIVE; private String messageID; private String type; - private Destination replyTo; + private SQSQueueDestination replyTo; private Destination destination; private final Map properties = new HashMap(); @@ -320,7 +323,10 @@ public Destination getJMSReplyTo() throws JMSException { @Override public void setJMSReplyTo(Destination replyTo) throws JMSException { - this.replyTo = replyTo; + if (replyTo != null && !(replyTo instanceof SQSQueueDestination)) { + throw new IllegalArgumentException("The replyTo Destination must be a SQSQueueDestination"); + } + this.replyTo = (SQSQueueDestination)replyTo; } /** diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java index 4896090..dc8383f 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java @@ -35,12 +35,16 @@ import java.io.IOException; import java.io.ObjectOutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.mockito.ArgumentMatcher; import static org.junit.Assert.assertEquals; @@ -55,12 +59,12 @@ /** * Test the SQSMessageConsumerPrefetchTest class */ +@RunWith(Parameterized.class) public class SQSMessageConsumerPrefetchFifoTest { private static final String NAMESPACE = "123456789012"; private static final String QUEUE_NAME = "QueueName.fifo"; private static final String QUEUE_URL = NAMESPACE + "/" + QUEUE_NAME; - private static final int NUMBER_OF_MESSAGES_TO_PREFETCH = 10; private Acknowledger acknowledger; private NegativeAcknowledger negativeAcknowledger; @@ -70,6 +74,17 @@ public class SQSMessageConsumerPrefetchFifoTest { private AmazonSQSMessagingClientWrapper amazonSQSClient; + @Parameters + public static List getParameters() { + return Arrays.asList(new Object[][] { {0}, {1}, {5}, {10}, {15} }); + } + + private final int numberOfMessagesToPrefetch; + + public SQSMessageConsumerPrefetchFifoTest(int numberOfMessagesToPrefetch) { + this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch; + } + @Before public void setup() { @@ -90,7 +105,7 @@ public void setup() { consumerPrefetch = spy(new SQSMessageConsumerPrefetch(sqsSessionRunnable, acknowledger, negativeAcknowledger, - sqsDestination, amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH)); + sqsDestination, amazonSQSClient, numberOfMessagesToPrefetch)); consumerPrefetch.backoffStrategy = backoffStrategy; } @@ -105,8 +120,9 @@ public void testOneFullPrefetch() throws InterruptedException, JMSException { * Set up consumer prefetch and mocks */ + final int numMessages = numberOfMessagesToPrefetch > 0 ? numberOfMessagesToPrefetch : 1; List messages = new ArrayList(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < numMessages; i++) { messages.add(createValidFifoMessage(i, "G" + i)); } @@ -114,6 +130,7 @@ public void testOneFullPrefetch() throws InterruptedException, JMSException { consumerPrefetch.start(); // Mock SQS call for receive message and return messages + final int receiveMessageLimit = Math.min(10, numMessages); when(amazonSQSClient.receiveMessage(argThat(new ArgumentMatcher() { @Override public boolean matches(Object argument) { @@ -122,7 +139,7 @@ public boolean matches(Object argument) { ReceiveMessageRequest other = (ReceiveMessageRequest)argument; return other.getQueueUrl().equals(QUEUE_URL) - && other.getMaxNumberOfMessages() == 10 + && other.getMaxNumberOfMessages() == receiveMessageLimit && other.getMessageAttributeNames().size() == 1 && other.getMessageAttributeNames().get(0).equals(SQSMessageConsumerPrefetch.ALL) && other.getWaitTimeSeconds() == SQSMessageConsumerPrefetch.WAIT_TIME_SECONDS @@ -139,6 +156,11 @@ public boolean matches(Object argument) { .thenReturn(false) .thenReturn(true); + /* + * Request a message (only relevant when prefetching is off). + */ + consumerPrefetch.requestMessage(); + /* * Run the prefetch */ @@ -161,7 +183,7 @@ public boolean matches(Object argument) { assertEquals(0, consumerPrefetch.retriesAttempted); // Ensure message queue was filled with expected messages - assertEquals(10, consumerPrefetch.messageQueue.size()); + assertEquals(numMessages, consumerPrefetch.messageQueue.size()); int index = 0; for (SQSMessageConsumerPrefetch.MessageManager messageManager : consumerPrefetch.messageQueue) { com.amazonaws.services.sqs.model.Message mockedMessage = messages.get(index); diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java index 9f8e5a2..23fb03d 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java @@ -14,74 +14,90 @@ */ package com.amazon.sqs.javamessaging; -import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; -import com.amazon.sqs.javamessaging.SQSConnection; -import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch; -import com.amazon.sqs.javamessaging.SQSQueueDestination; -import com.amazon.sqs.javamessaging.SQSSessionCallbackScheduler; -import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.MessageManager; -import com.amazon.sqs.javamessaging.acknowledge.Acknowledger; -import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger; -import com.amazon.sqs.javamessaging.message.SQSBytesMessage; -import com.amazon.sqs.javamessaging.message.SQSMessage; -import com.amazon.sqs.javamessaging.message.SQSObjectMessage; -import com.amazon.sqs.javamessaging.message.SQSTextMessage; -import com.amazon.sqs.javamessaging.util.ExponentialBackoffStrategy; -import com.amazonaws.util.Base64; -import com.amazonaws.services.sqs.model.*; - -import javax.jms.*; -import javax.jms.Message; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; + +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; +import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.MessageManager; +import com.amazon.sqs.javamessaging.acknowledge.Acknowledger; +import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger; +import com.amazon.sqs.javamessaging.message.SQSBytesMessage; +import com.amazon.sqs.javamessaging.message.SQSMessage; +import com.amazon.sqs.javamessaging.message.SQSObjectMessage; +import com.amazon.sqs.javamessaging.message.SQSTextMessage; +import com.amazon.sqs.javamessaging.util.ExponentialBackoffStrategy; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import com.amazonaws.util.Base64; /** * Test the SQSMessageConsumerPrefetchTest class */ @SuppressWarnings("unchecked") +@RunWith(Parameterized.class) public class SQSMessageConsumerPrefetchTest { private static final String NAMESPACE = "123456789012"; private static final String QUEUE_NAME = "QueueName"; private static final String QUEUE_URL = NAMESPACE + "/" + QUEUE_NAME; - private static final int NUMBER_OF_MESSAGES_TO_PREFETCH = 10; + + @Parameters + public static List getParameters() { + return Arrays.asList(new Object[][] { {0}, {1}, {5}, {10}, {15} }); + } + + private final int numberOfMessagesToPrefetch; private Acknowledger acknowledger; private NegativeAcknowledger negativeAcknowledger; @@ -92,6 +108,10 @@ public class SQSMessageConsumerPrefetchTest { private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); private AmazonSQSMessagingClientWrapper amazonSQSClient; + public SQSMessageConsumerPrefetchTest(int numberOfMessagesToPrefetch) { + this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch; + } + @Before public void setup() { @@ -112,7 +132,7 @@ public void setup() { consumerPrefetch = spy(new SQSMessageConsumerPrefetch(sqsSessionRunnable, acknowledger, negativeAcknowledger, - sqsDestination, amazonSQSClient, NUMBER_OF_MESSAGES_TO_PREFETCH)); + sqsDestination, amazonSQSClient, numberOfMessagesToPrefetch)); consumerPrefetch.backoffStrategy = backoffStrategy; } @@ -131,16 +151,18 @@ public void testEndToEnd() throws InterruptedException, JMSException { consumerPrefetch.start(); // Create messages return from SQS + final int numMessages = numberOfMessagesToPrefetch > 0 ? numberOfMessagesToPrefetch : 1; final List receipt = new ArrayList(); - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < numMessages; ++i) { receipt.add("r" + i); } ReceiveMessageResult receivedMessageResult = createReceiveMessageResult(receipt); // Mock SQS call for receive message and return messages + int receiveMessageLimit = Math.min(10, numMessages); when(amazonSQSClient.receiveMessage( eq(new ReceiveMessageRequest(QUEUE_URL) - .withMaxNumberOfMessages(10) + .withMaxNumberOfMessages(receiveMessageLimit) .withAttributeNames(SQSMessageConsumerPrefetch.ALL) .withMessageAttributeNames(SQSMessageConsumerPrefetch.ALL) .withWaitTimeSeconds(SQSMessageConsumerPrefetch.WAIT_TIME_SECONDS)))) @@ -157,7 +179,7 @@ public void testEndToEnd() throws InterruptedException, JMSException { public Boolean answer(InvocationOnMock invocation) throws Throwable { // Ensure message queue was filled with expected messages //after we return 'isClosed() == true' we will empty the prefetch queue while nacking messages - assertEquals(10, consumerPrefetch.messageQueue.size()); + assertEquals(numMessages, consumerPrefetch.messageQueue.size()); for (SQSMessageConsumerPrefetch.MessageManager messageManager : consumerPrefetch.messageQueue) { SQSMessage sqsMessage = (SQSMessage)messageManager.getMessage(); assertTrue(receipt.contains(sqsMessage.getReceiptHandle())); @@ -167,6 +189,11 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { } }); + /* + * Request a message (only relevant when prefetching is off). + */ + consumerPrefetch.requestMessage(); + /* * Run the prefetch */ @@ -189,7 +216,7 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { assertEquals(0, consumerPrefetch.retriesAttempted); // Ensure message queue was filled with expected messages - assertEquals(10, consumerPrefetch.messageQueue.size()); + assertEquals(numMessages, consumerPrefetch.messageQueue.size()); for (SQSMessageConsumerPrefetch.MessageManager messageManager : consumerPrefetch.messageQueue) { SQSMessage sqsMessage = (SQSMessage)messageManager.getMessage(); assertTrue(receipt.contains(sqsMessage.getReceiptHandle())); @@ -712,7 +739,7 @@ public void testWaitForPrefetchLimitReached() throws InterruptedException { /* * Set up consumer prefetch and mocks */ - consumerPrefetch.messagesPrefetched = NUMBER_OF_MESSAGES_TO_PREFETCH + 5; + consumerPrefetch.messagesPrefetched = numberOfMessagesToPrefetch + 5; final CountDownLatch beforeWaitForPrefetchCall = new CountDownLatch(1); final CountDownLatch passedWaitForPrefetch = new CountDownLatch(1); @@ -741,7 +768,7 @@ public void run() { assertEquals(false, passedWaitForPrefetch.await(3, TimeUnit.SECONDS)); // Simulate messages were processes - consumerPrefetch.messagesPrefetched = NUMBER_OF_MESSAGES_TO_PREFETCH - 1; + consumerPrefetch.messagesPrefetched = numberOfMessagesToPrefetch - 1; // Release the local and ensure that we no longer waiting since the prefetch message is below the limit consumerPrefetch.notifyStateChange(); @@ -757,7 +784,7 @@ public void testWaitForPrefetchIsClosed() throws InterruptedException { /* * Set up consumer prefetch and mocks */ - consumerPrefetch.messagesPrefetched = NUMBER_OF_MESSAGES_TO_PREFETCH + 5; + consumerPrefetch.messagesPrefetched = numberOfMessagesToPrefetch + 5; consumerPrefetch.close(); final CountDownLatch beforeWaitForPrefetchCall = new CountDownLatch(1); @@ -793,7 +820,7 @@ public void testWaitForPrefetchInterrupted() throws InterruptedException { /* * Set up consumer prefetch and mocks */ - consumerPrefetch.messagesPrefetched = NUMBER_OF_MESSAGES_TO_PREFETCH + 5; + consumerPrefetch.messagesPrefetched = numberOfMessagesToPrefetch + 5; final CountDownLatch beforeWaitForPrefetchCall = new CountDownLatch(1); final CountDownLatch recvInterruptedExceptionLatch = new CountDownLatch(1); @@ -1736,6 +1763,77 @@ public void testClose() throws InterruptedException, JMSException { assertTrue(consumerPrefetch.closed); } + /** + * Test that concurrent receive requests results in fetching more messages + * from the queue with a single request, even if prefetching is set lower or even to 0. + */ + @Test + public void testRequestedMessageTracking() throws InterruptedException, JMSException, ExecutionException { + int concurrentReceives = 3; + int receiveBatchSize = Math.min(SQSMessagingClientConstants.MAX_BATCH, + Math.max(concurrentReceives, numberOfMessagesToPrefetch)); + + // Create messages return from SQS + final List receipt = new ArrayList(); + for (int i = 0; i < receiveBatchSize; ++i) { + receipt.add("r" + i); + } + ReceiveMessageResult receivedMessageResult = createReceiveMessageResult(receipt); + + // Mock SQS call for receive message and return messages + when(amazonSQSClient.receiveMessage( + eq(new ReceiveMessageRequest(QUEUE_URL) + .withMaxNumberOfMessages(receiveBatchSize) + .withAttributeNames(SQSMessageConsumerPrefetch.ALL) + .withMessageAttributeNames(SQSMessageConsumerPrefetch.ALL) + .withWaitTimeSeconds(SQSMessageConsumerPrefetch.WAIT_TIME_SECONDS)))) + .thenReturn(receivedMessageResult); + + final CountDownLatch allReceivesWaiting = new CountDownLatch(concurrentReceives); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + invocation.callRealMethod(); + allReceivesWaiting.countDown(); + return null; + } + }).when(consumerPrefetch).requestMessage(); + + // Close the prefetcher immediately after completing one loop + final List> receivedMessageFutures = new ArrayList>(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + invocation.callRealMethod(); + for (Future messageFuture : receivedMessageFutures) { + Assert.assertNotNull(messageFuture.get()); + } + consumerPrefetch.close(); + return null; + } + }).when(consumerPrefetch).processReceivedMessages(any(List.class)); + + // Set running to true first so that the receive calls don't terminate early + consumerPrefetch.running = true; + + ExecutorService receiveExecutor = Executors.newFixedThreadPool(concurrentReceives); + for (int i = 0; i < concurrentReceives; i++) { + receivedMessageFutures.add(receiveExecutor.submit(new Callable() { + @Override + public Message call() throws Exception { + return consumerPrefetch.receive(); + } + })); + } + + // Wait to make sure the receive calls have gotten far enough to + // wait on the message queue + allReceivesWaiting.await(); + + Assert.assertEquals(concurrentReceives, consumerPrefetch.messagesRequested); + + consumerPrefetch.run(); + } /* * Utility functions diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java index 6d9a7e6..dec5842 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java @@ -502,6 +502,10 @@ public void messageDispatched() { } } + @Override + public void messageListenerReady() { + } + @Override public SQSMessageConsumer getMessageConsumer() { return consumer1;