Skip to content

Commit

Permalink
'Version 1.0.4 of the Amazon SQS Java Messaging Library'
Browse files Browse the repository at this point in the history
  • Loading branch information
robin-aws committed Aug 10, 2017
1 parent 9004d64 commit ec0677b
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 95 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ for communicating with Amazon Simple Queue Service. This project builds on top o
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
<version>1.0.3</version>
<version>1.0.4</version>
<type>jar</type>
</dependency>
```
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
<version>1.0.3</version>
<version>1.0.4</version>
<packaging>jar</packaging>
<name>Amazon SQS Java Messaging Library</name>
<description>The Amazon SQS Java Messaging Library holds the Java Message Service compatible classes, that are used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public interface PrefetchManager {
*/
public void messageDispatched();

/**
* Notify the prefetchThread that the message listener has finished with any
* previous message and is ready to accept another.
*/
public void messageListenerReady();

/**
* This is used to determine the state of the consumer, when the message
* listener scheduler is processing the messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public int getNumberOfMessagesToPrefetch() {
}

public void setNumberOfMessagesToPrefetch(int numberOfMessagesToPrefetch) {
if (numberOfMessagesToPrefetch < SQSMessagingClientConstants.MIN_BATCH) {
throw new IllegalArgumentException(String.format("Invalid prefetch size. Provided value '%1$s' cannot be smaller than '%2$s'", numberOfMessagesToPrefetch, SQSMessagingClientConstants.MIN_BATCH));
if (numberOfMessagesToPrefetch < SQSMessagingClientConstants.MIN_PREFETCH) {
throw new IllegalArgumentException(String.format("Invalid prefetch size. Provided value '%1$s' cannot be smaller than '%2$s'", numberOfMessagesToPrefetch, SQSMessagingClientConstants.MIN_PREFETCH));
}
this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch;
}
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


import javax.jms.IllegalStateException;

import javax.jms.Connection;

import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
Expand Down Expand Up @@ -93,7 +90,8 @@ public class SQSConnection implements Connection, QueueConnection {

/**
* Configures the amount of messages that can be prefetched by a consumer. A
* single consumer cannot prefetch more than 10 messages.
* single consumer cannot prefetch more than 10 messages in a single call to SQS,
* but it will make multiple calls as necessary.
*/
private final int numberOfMessagesToPrefetch;
private volatile boolean closed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
*/
package com.amazon.sqs.javamessaging;

import java.net.URI;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;

Expand Down Expand Up @@ -92,7 +94,14 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
* Counter on how many messages are prefetched into internal messageQueue.
*/
protected int messagesPrefetched = 0;


/**
* Counter on how many messages have been explicitly requested.
* TODO: Consider renaming this class and several other variables now that
* this logic factors in message requests as well as prefetching.
*/
protected int messagesRequested = 0;

/**
* States of the prefetch thread
*/
Expand Down Expand Up @@ -163,9 +172,24 @@ protected void setMessageListener(MessageListener messageListener) {
List<MessageManager> allPrefetchedMessages = new ArrayList<MessageManager>(messageQueue);
sqsSessionRunnable.scheduleCallBacks(messageListener, allPrefetchedMessages);
messageQueue.clear();

// This will request the first message if necessary.
// TODO: This may overfetch if setMessageListener is being called multiple
// times, as the session callback scheduler may already have entries for this consumer.
messageListenerReady();
}
}

/**
* Determine the number of messages we should attempt to fetch from SQS.
* Returns the difference between the number of messages needed (either for
* prefetching or by request) and the number currently fetched.
*/
private int numberOfMessagesToFetch() {
int numberOfMessagesNeeded = Math.max(numberOfMessagesToPrefetch, messagesRequested);
return Math.max(numberOfMessagesNeeded - messagesPrefetched, 0);
}

/**
* Runs until the message consumer is closed and in-progress SQS
* <code>receiveMessage</code> call returns.
Expand All @@ -190,8 +214,7 @@ public void run() {
synchronized (stateLock) {
waitForStart();
waitForPrefetch();
prefetchBatchSize = Math.min(
(numberOfMessagesToPrefetch - messagesPrefetched), SQSMessagingClientConstants.MAX_BATCH);
prefetchBatchSize = Math.min(numberOfMessagesToFetch(), SQSMessagingClientConstants.MAX_BATCH);
}

if (!isClosed()) {
Expand Down Expand Up @@ -290,7 +313,7 @@ protected void processReceivedMessages(List<Message> messages) {

protected void waitForPrefetch() throws InterruptedException {
synchronized (stateLock) {
while (messagesPrefetched >= numberOfMessagesToPrefetch && !isClosed()) {
while (numberOfMessagesToFetch() <= 0 && !isClosed()) {
try {
stateLock.wait();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -332,7 +355,20 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
throw new JMSException("Not a supported JMS message type");
}
}

jmsMessage.setJMSDestination(sqsDestination);

MessageAttributeValue replyToQueueNameAttribute = message.getMessageAttributes().get(
SQSMessage.JMS_SQS_REPLY_TO_QUEUE_NAME);
MessageAttributeValue replyToQueueUrlAttribute = message.getMessageAttributes().get(
SQSMessage.JMS_SQS_REPLY_TO_QUEUE_URL);
if (replyToQueueNameAttribute != null && replyToQueueUrlAttribute != null) {
String replyToQueueUrl = replyToQueueUrlAttribute.getStringValue();
String replyToQueueName = replyToQueueNameAttribute.getStringValue();
Destination replyToQueue = new SQSQueueDestination(replyToQueueName, replyToQueueUrl);
jmsMessage.setJMSReplyTo(replyToQueue);
}

return jmsMessage;
}

Expand Down Expand Up @@ -366,12 +402,38 @@ protected void waitForStart() throws InterruptedException {
public void messageDispatched() {
synchronized (stateLock) {
messagesPrefetched--;
if (messagesPrefetched < numberOfMessagesToPrefetch) {
messagesRequested--;
if (numberOfMessagesToFetch() > 0) {
notifyStateChange();
}
}
}

@Override
public void messageListenerReady() {
synchronized (stateLock) {
// messagesRequested may still be more than zero if there were pending receive()
// calls when the message listener was set.
if (messagesRequested <= 0 && !isClosed() && messageListener != null) {
requestMessage();
}
}
}

void requestMessage() {
synchronized (stateLock) {
messagesRequested++;
notifyStateChange();
}
}

private void unrequestMessage() {
synchronized (stateLock) {
messagesRequested--;
notifyStateChange();
}
}

public static class MessageManager {

private final PrefetchManager prefetchManager;
Expand Down Expand Up @@ -405,28 +467,35 @@ javax.jms.Message receive(long timeout) throws JMSException {
timeout = 0;
}

MessageManager messageManager;
MessageManager messageManager = null;
synchronized (stateLock) {
// If message exists in queue poll.
if (!messageQueue.isEmpty()) {
messageManager = messageQueue.pollFirst();
} else {
long startTime = System.currentTimeMillis();

long waitTime = 0;
while (messageQueue.isEmpty() && !isClosed() &&
(timeout == 0 || (waitTime = getWaitTime(timeout, startTime)) > 0)) {
try {
stateLock.wait(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
requestMessage();
try {
long startTime = System.currentTimeMillis();

long waitTime = 0;
while (messageQueue.isEmpty() && !isClosed() &&
(timeout == 0 || (waitTime = getWaitTime(timeout, startTime)) > 0)) {
try {
stateLock.wait(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
if (messageQueue.isEmpty() || isClosed()) {
return null;
}
}
if (messageQueue.isEmpty() || isClosed()) {
return null;
}
messageManager = messageQueue.pollFirst();
messageManager = messageQueue.pollFirst();
} finally {
if (messageManager == null) {
unrequestMessage();
}
}
}
}
return messageHandler(messageManager);
Expand Down
55 changes: 43 additions & 12 deletions src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@

import com.amazon.sqs.javamessaging.message.SQSBytesMessage;
import com.amazon.sqs.javamessaging.message.SQSMessage;
import com.amazon.sqs.javamessaging.message.SQSMessage.JMSMessagePropertyValue;
import com.amazon.sqs.javamessaging.message.SQSObjectMessage;
import com.amazon.sqs.javamessaging.message.SQSTextMessage;
import com.amazon.sqs.javamessaging.message.SQSMessage.JMSMessagePropertyValue;
import com.amazonaws.util.Base64;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
import com.amazonaws.util.Base64;

/**
* A client uses a MessageProducer object to send messages to a queue
Expand Down Expand Up @@ -109,7 +109,15 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
throw new JMSException("Message body cannot be null or empty");
}
Map<String, MessageAttributeValue> messageAttributes = propertyToMessageAttribute((SQSMessage) message);
addMessageTypeReservedAttribute(messageAttributes, (SQSMessage) message, messageType);

/**
* These will override existing attributes if they exist. Everything that
* has prefix JMS_ is reserved for JMS Provider, but if the user sets that
* attribute, it will be overwritten.
*/
addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_MESSAGE_TYPE, messageType);
addReplyToQueueReservedAttributes(messageAttributes, message);

SendMessageRequest sendMessageRequest = new SendMessageRequest(queue.getQueueUrl(), sqsMessageBody);
sendMessageRequest.setMessageAttributes(messageAttributes);

Expand Down Expand Up @@ -218,19 +226,42 @@ Map<String, MessageAttributeValue> propertyToMessageAttribute(SQSMessage message
private void addMessageTypeReservedAttribute(Map<String, MessageAttributeValue> messageAttributes,
SQSMessage message, String value) throws JMSException {

MessageAttributeValue messageAttributeValue = new MessageAttributeValue();

messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING);
messageAttributeValue.setStringValue(value);
addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_MESSAGE_TYPE, value);
}

/**
* This will override the existing attribute if exists. Everything that
* has prefix JMS_ is reserved for JMS Provider, but if the user sets that
* attribute, it will be overwritten.
*/
messageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE, messageAttributeValue);
/**
* Adds the reply-to queue name and url attributes during send as part of the send message
* request, if necessary
*/
private void addReplyToQueueReservedAttributes(Map<String, MessageAttributeValue> messageAttributes,
SQSMessage message) throws JMSException {

Destination replyTo = message.getJMSReplyTo();
if (replyTo instanceof SQSQueueDestination) {
SQSQueueDestination replyToQueue = (SQSQueueDestination)replyTo;

/**
* This will override the existing attributes if exists. Everything that
* has prefix JMS_ is reserved for JMS Provider, but if the user sets that
* attribute, it will be overwritten.
*/
addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_REPLY_TO_QUEUE_NAME, replyToQueue.getQueueName());
addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_REPLY_TO_QUEUE_URL, replyToQueue.getQueueUrl());
}
}

/**
* Convenience method for adding a single string attribute.
*/
private void addStringAttribute(Map<String, MessageAttributeValue> messageAttributes,
String key, String value) {
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING);
messageAttributeValue.setStringValue(value);
messageAttributes.put(key, messageAttributeValue);
}

/**
* Sends a message to a queue.
* <P>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class SQSMessagingClientConstants {
public static final int MAX_BATCH = 10;

public static final int MIN_BATCH = 1;

public static final int MIN_PREFETCH = 0;

/**
* JMSMessage available user property types, which are mapped to message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ public void run() {
}
} finally {
session.finishedCallback();

// Let the prefetch manager know we're available to
// process another message (if there is a still a listener attached).
messageManager.getPrefetchManager().messageListenerReady();
}
} catch (Throwable ex) {
LOG.error("Unexpected exception thrown during the run of the scheduled callback", ex);
Expand Down Expand Up @@ -211,7 +215,7 @@ void scheduleCallBacks(MessageListener messageListener, List<MessageManager> mes
}
}
}

void nackQueuedMessages() {
synchronized (callbackQueue) {
try {
Expand Down
Loading

0 comments on commit ec0677b

Please sign in to comment.