Skip to content

Commit

Permalink
refs #410: pull out annotations to own package
Browse files Browse the repository at this point in the history
  • Loading branch information
Jaiden Ashmore authored and Jaiden Ashmore committed Oct 6, 2024
1 parent 5630a7a commit 4bd7ea0
Show file tree
Hide file tree
Showing 110 changed files with 1,322 additions and 1,273 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The following provides some examples using the library with different languages
```

1. In one of your beans, attach a
[@QueueListener](./spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java)
[@QueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/basic/QueueListener.java)
annotation to a method indicating that it should process messages from a queue.

```java
Expand Down Expand Up @@ -572,7 +572,7 @@ lambdaProcessor {
The [Spring Cloud AWS Messaging](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-messaging) `@SqsListener` works by requesting
a set of messages from the SQS and when they are done it will request some more. There is one disadvantage with this approach in that if 9/10 of the messages
finish in 10 milliseconds but one takes 10 seconds no other messages will be picked up until that last message is complete. The
[@QueueListener](./spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java)
[@QueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/basic/QueueListener.java)
provides the same basic functionality, but it also provides a timeout where it will eventually request for more messages when there are threads that are
ready for another message.
Expand Down Expand Up @@ -619,7 +619,7 @@ not prefetch anymore._
#### Spring Boot
The [@PrefetchingQueueListener](./spring/spring-core/src/main/java/com/jashmore/sqs/spring/container/prefetch/PrefetchingQueueListener.java)
The [@PrefetchingQueueListener](./annotations/src/main/java/com/jashmore/sqs/annotations/core/prefetch/PrefetchingQueueListener.java)
annotation can be used to prefetch messages in a background thread while processing the existing messages. The usage is something like this:
```java
Expand Down
10 changes: 10 additions & 0 deletions annotations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Java Dynamic SQS Listener Annotations

Wrapper around the core library that allows for setting up using annotations to simplify the usage.
ore message listener.

## More Information

For more information you can look at the root project [README.md](../README.md) which provides more information about the architecture
of the application. The [API](../api) is also a good location to find more information about what each part of the framework is how
they interact with each other.
11 changes: 11 additions & 0 deletions annotations/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

description = "Contains a way to attach message listeners via annotations"

dependencies {
api(project(":java-dynamic-sqs-listener-core"))
implementation(project(":common-utils"))
implementation(project(":annotation-utils"))
compileOnly(project(":documentation-annotations"))

testImplementation(project(":elasticmq-sqs-client"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.jashmore.sqs.annotations.container;

import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.argument.ArgumentResolverService;
import com.jashmore.sqs.client.QueueResolver;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import com.jashmore.sqs.container.MessageListenerContainer;
import com.jashmore.sqs.container.MessageListenerContainerFactory;
import com.jashmore.sqs.container.MessageListenerContainerInitialisationException;
import com.jashmore.sqs.processor.CoreMessageProcessor;
import com.jashmore.sqs.processor.DecoratingMessageProcessorFactory;
import com.jashmore.sqs.processor.MessageProcessor;
import com.jashmore.sqs.util.annotation.AnnotationUtils;
import com.jashmore.sqs.util.identifier.IdentifierUtils;
import com.jashmore.sqs.util.string.StringUtils;
import lombok.Builder;
import org.immutables.value.Value;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* {@link MessageListenerContainerFactory} that can be used to build against an annotated method
* @param <A>
*/
public class AnnotationMessageListenerContainerFactory<A extends Annotation> implements MessageListenerContainerFactory {
private final Class<A> annotationClass;
private final Function<A, String> identifierMapper;
private final Function<A, String> sqsClientIdentifier;
private final Function<A, String> queueNameOrUrlMapper;
private final QueueResolver queueResolver;
private final SqsAsyncClientProvider sqsAsyncClientProvider;
private final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory;
private final ArgumentResolverService argumentResolverService;
private final Function<AnnotationDetails<A>, MessageListenerContainer> containerFactory;

/**
* Constructor.
*
* @param annotationClass the class instance of the annotation
* @param identifierMapper to convert an annotation to the identifier of the listener
* @param sqsClientIdentifierMapper to convert an annotation to the SQS Client identifier
* @param queueNameOrUrlMapper to convert an annotation to the Queue URL or name
* @param queueResolver to resolve queue names to a URL
* @param sqsAsyncClientProvider the method for obtaining a SQS client from the identifier
* @param decoratingMessageProcessorFactory to wrap the message processing with any decorators
* @param argumentResolverService to map the parameters of the method to values in the message
* @param containerFactory converts details about the annotation to the final {@link MessageListenerContainer}
*/
public AnnotationMessageListenerContainerFactory(final Class<A> annotationClass,
final Function<A, String> identifierMapper,
final Function<A, String> sqsClientIdentifierMapper,
final Function<A, String> queueNameOrUrlMapper,
final QueueResolver queueResolver,
final SqsAsyncClientProvider sqsAsyncClientProvider,
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory,
final ArgumentResolverService argumentResolverService,
final Function<AnnotationDetails<A>, MessageListenerContainer> containerFactory) {
this.annotationClass = annotationClass;
this.identifierMapper = identifierMapper;
this.sqsClientIdentifier = sqsClientIdentifierMapper;
this.queueNameOrUrlMapper = queueNameOrUrlMapper;
this.queueResolver = queueResolver;
this.sqsAsyncClientProvider = sqsAsyncClientProvider;
this.decoratingMessageProcessorFactory = decoratingMessageProcessorFactory;
this.argumentResolverService = argumentResolverService;
this.containerFactory = containerFactory;
}

@Override
public Optional<MessageListenerContainer> buildContainer(final Object bean, final Method method) throws MessageListenerContainerInitialisationException {
return AnnotationUtils
.findMethodAnnotation(method, this.annotationClass)
.map(annotation -> {
final SqsAsyncClient sqsAsyncClient = getSqsAsyncClient(annotation);
final QueueProperties queueProperties = QueueProperties.builder()
.queueUrl(queueResolver.resolveQueueUrl(sqsAsyncClient, queueNameOrUrlMapper.apply(annotation)))
.build();
final String identifier = IdentifierUtils.buildIdentifierForMethod(identifierMapper.apply(annotation), bean.getClass(), method);

final Supplier<MessageProcessor> messageProcessorSupplier = () ->
decoratingMessageProcessorFactory.decorateMessageProcessor(
sqsAsyncClient,
identifier,
queueProperties,
bean,
method,
new CoreMessageProcessor(argumentResolverService, queueProperties, sqsAsyncClient, method, bean)
);

return containerFactory.apply(AnnotationDetails.<A>builder()
.identifier(identifier)
.queueProperties(queueProperties)
.sqsAsyncClient(sqsAsyncClient)
.messageProcessorSupplier(messageProcessorSupplier)
.annotation(annotation)
.build());
});
}

private SqsAsyncClient getSqsAsyncClient(final A annotation) {
final String sqsClient = sqsClientIdentifier.apply(annotation);

if (!StringUtils.hasText(sqsClient)) {
return sqsAsyncClientProvider
.getDefaultClient()
.orElseThrow(() -> new MessageListenerContainerInitialisationException("Expected the default SQS Client but there is none")
);
}

return sqsAsyncClientProvider
.getClient(sqsClient)
.orElseThrow(() ->
new MessageListenerContainerInitialisationException("Expected a client with id '" + sqsClient + "' but none were found")
);
}

@Value
@Builder
public static class AnnotationDetails<A extends Annotation> {
public String identifier;
public SqsAsyncClient sqsAsyncClient;
public QueueProperties queueProperties;
public Supplier<MessageProcessor> messageProcessorSupplier;
public A annotation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.jashmore.sqs.annotations.core.basic;

import com.jashmore.sqs.annotations.container.AnnotationMessageListenerContainerFactory;
import com.jashmore.sqs.argument.ArgumentResolverService;
import com.jashmore.sqs.client.QueueResolver;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import com.jashmore.sqs.container.MessageListenerContainer;
import com.jashmore.sqs.container.MessageListenerContainerFactory;
import com.jashmore.sqs.container.MessageListenerContainerInitialisationException;
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainer;
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainerProperties;
import com.jashmore.sqs.processor.DecoratingMessageProcessorFactory;

import java.lang.reflect.Method;
import java.util.Optional;

/**
* {@link MessageListenerContainerFactory} that will wrap methods annotated with
* {@link QueueListener @QueueListener} with some predefined implementations of the framework.
*/
public class BasicAnnotationMessageListenerContainerFactory implements MessageListenerContainerFactory {

private final AnnotationMessageListenerContainerFactory<QueueListener> delegate;

public BasicAnnotationMessageListenerContainerFactory(
final ArgumentResolverService argumentResolverService,
final SqsAsyncClientProvider sqsAsyncClientProvider,
final QueueResolver queueResolver,
final QueueListenerParser queueListenerParser,
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory
) {
this.delegate = new AnnotationMessageListenerContainerFactory<>(
QueueListener.class,
QueueListener::identifier,
QueueListener::sqsClient,
QueueListener::value,
queueResolver,
sqsAsyncClientProvider,
decoratingMessageProcessorFactory,
argumentResolverService,
(details) -> {
final BatchingMessageListenerContainerProperties properties = queueListenerParser.parse(details.annotation);
return new BatchingMessageListenerContainer(
details.identifier,
details.queueProperties,
details.sqsAsyncClient,
details.messageProcessorSupplier,
properties
);
}
);
}

@Override
public Optional<MessageListenerContainer> buildContainer(final Object bean, final Method method) throws MessageListenerContainerInitialisationException {
return this.delegate.buildContainer(bean, method);
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
package com.jashmore.sqs.spring.container.basic;
package com.jashmore.sqs.annotations.core.basic;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.aws.AwsConstants;
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
import com.jashmore.sqs.container.MessageListenerContainer;
import com.jashmore.sqs.processor.CoreMessageProcessor;
import com.jashmore.sqs.retriever.batching.BatchingMessageRetriever;
import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties;
import com.jashmore.sqs.spring.client.SqsAsyncClientProvider;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import org.springframework.core.env.Environment;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

/**
* Wrap a method with a {@link MessageListenerContainer} that will execute the method whenever a message is received on the provided queue.
*
* <p>This is a simplified annotation that uses the {@link ConcurrentMessageBroker}, {@link BatchingMessageRetriever} and {@link CoreMessageProcessor}
* for the implementations of the framework. Not all of the properties for each implementation are available to simplify this usage.
* for the implementations of the framework. Not all the properties for each implementation are available to simplify this usage.
*
* @see BasicMessageListenerContainerFactory for what processes this annotation
* @see BasicAnnotationMessageListenerContainerFactory for what processes this annotation
*/
@Retention(RUNTIME)
@Target(METHOD)
Expand All @@ -40,7 +40,7 @@
* </ul>
*
* @return the queue name or URL of the queue
* @see Environment#resolveRequiredPlaceholders(String) for how the placeholders are resolved
* @see PlaceholderResolver#resolvePlaceholders(String) for how the placeholders are resolved
* @see QueueProperties#getQueueUrl() for how the URL of the queue is resolved if a queue name is supplied here
*/
String value();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
package com.jashmore.sqs.spring.container.basic;
package com.jashmore.sqs.annotations.core.basic;

import com.jashmore.documentation.annotations.Max;
import com.jashmore.documentation.annotations.Nullable;
import com.jashmore.documentation.annotations.Positive;
import com.jashmore.documentation.annotations.PositiveOrZero;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.aws.AwsConstants;
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainerProperties;
import com.jashmore.sqs.spring.container.CoreAnnotationParser;
import com.jashmore.sqs.util.string.StringUtils;

import java.time.Duration;
import java.util.function.Supplier;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;

/**
* Parser that is used to transform a {@link QueueListener} annotation to a {@link BatchingMessageListenerContainerProperties}.
*/
public class QueueListenerParser implements CoreAnnotationParser<QueueListener, BatchingMessageListenerContainerProperties> {
public class QueueListenerParser {

private final Environment environment;
private final PlaceholderResolver placeholderResolver;

public QueueListenerParser(final Environment environment) {
this.environment = environment;
public QueueListenerParser(final PlaceholderResolver placeholderResolver) {
this.placeholderResolver = placeholderResolver;
}

@Override
public BatchingMessageListenerContainerProperties parse(QueueListener annotation) {
final Supplier<Integer> concurrencySupplier = concurrencySupplier(annotation);
final Supplier<Duration> concurrencyPollingRateSupplier = concurrencyPollingRateSupplier(annotation);
Expand Down Expand Up @@ -102,7 +101,7 @@ protected Supplier<Integer> concurrencySupplier(final QueueListener annotation)
if (!StringUtils.hasText(annotation.concurrencyLevelString())) {
concurrencyLevel = annotation.concurrencyLevel();
} else {
concurrencyLevel = Integer.parseInt(environment.resolvePlaceholders(annotation.concurrencyLevelString()));
concurrencyLevel = Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.concurrencyLevelString()));
}
return () -> concurrencyLevel;
}
Expand Down Expand Up @@ -134,7 +133,7 @@ protected Supplier<Integer> batchSizeSupplier(final QueueListener annotation) {
if (!StringUtils.hasText(annotation.batchSizeString())) {
batchSize = annotation.batchSize();
} else {
batchSize = Integer.parseInt(environment.resolvePlaceholders(annotation.batchSizeString()));
batchSize = Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.batchSizeString()));
}

return () -> batchSize;
Expand All @@ -154,7 +153,7 @@ protected Supplier<Duration> batchingPeriodSupplier(final QueueListener annotati
if (!StringUtils.hasText(annotation.batchingPeriodInMsString())) {
batchingPeriod = Duration.ofMillis(annotation.batchingPeriodInMs());
} else {
batchingPeriod = Duration.ofMillis(Integer.parseInt(environment.resolvePlaceholders(annotation.batchingPeriodInMsString())));
batchingPeriod = Duration.ofMillis(Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.batchingPeriodInMsString())));
}
return () -> batchingPeriod;
}
Expand Down Expand Up @@ -193,7 +192,7 @@ protected Supplier<Duration> messageVisibilityTimeoutSupplier(final QueueListene
}
} else {
messageVisibilityTimeout =
Duration.ofSeconds(Integer.parseInt(environment.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString())));
Duration.ofSeconds(Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString())));
}

return () -> messageVisibilityTimeout;
Expand Down
Loading

0 comments on commit 4bd7ea0

Please sign in to comment.