Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New serverless pattern - Serverless Messaging Redrive #2543

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions serverless-message-processing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Serverless Message Processing Pattern

## Overview
An adaptable pattern for message processing using AWS serverless services, featuring error handling and automatic recovery mechanisms.

## Core Components
- API Gateway (message ingestion)
- SQS Queues (main + DLQs)
- Lambda Functions (processing + recovery)

## Basic Flow
1. Messages enter through API Gateway
2. Main queue receives messages
3. Processor Lambda handles messages

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would include that this Lambda function polls the main queue using Event Source Mappings (ESMs) and handles the messages. Include links to docs / blogs / posts where they can build understanding.

4. Failed messages route to DLQs

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're going to use terse statements in a numbered or bulleted list, it would be good to include a more full explanation alongside them. In this context what's a "failed message"? See if you can go a level deeper to explain what's going on as it may be confusing to somebody unfamiliar with the pattern or certain service features, like DLQ.

5. Decision maker attempts an automated recovery

## Deployment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely to come up in @ellisms review, but we typically include a statement on users being responsible for spinning up / tearing down / paying for resources in their own AWS accounts. We'd also typically talk about having the AWS CLI installed and configured as a pre-requisite to using SAM. Take a look at some other SAM examples to get a feel for the norm.

# Build the SAM application
```
sam build
```
# Deploy the application
```
sam deploy --guided
```

## Key Features

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These may make sense to you in the context of being the author, but somebody coming to this as a new idea will need each of these explaining.

- Automatic retry mechanism
- Segregation of recoverable/fatal errors
- Extensible processing logic

## API Reference
# Send Message

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to include a copy of the API contract, but It would be nice to see commands that users can use in their terminal as an example curl https://${endpoint}/message.... with the relevant params to send a sample message.

```

POST /message
Content-Type: application/json
```
```
{
"messageType": "TYPE_A|TYPE_B|TYPE_C",
"payload": {},
"timestamp": "ISO8601_TIMESTAMP"
}
```


## Adaptation Points

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to keep a terse list then you might need a sentence or 2 before the list to explain what you mean by Adaptation Points (earlier you called it extensible, pick one way of describing this concept).

- Message validation rules
- Processing logic
- Error handling strategies
- Recovery mechanisms
- Monitoring requirements
- API Design

## Note

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this section, or is it a note to self that should have been deleted?

This is a sample pattern. Adapt security, scaling, and processing logic according to your requirements.
60 changes: 60 additions & 0 deletions serverless-message-processing/example-pattern.json

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zalilias you need to rename the file to be in line with the name of your pattern and replace the contents of the template / example with information about your pattern.

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"title": "Step Functions to Athena",
"description": "Create a Step Functions workflow to query Amazon Athena.",
"language": "Python",
"level": "200",
"framework": "SAM",
"introBox": {
"headline": "How it works",
"text": [
"This sample project demonstrates how to use a serverless solution for processing and fixing malformed messages using SQS queues and Lambda functions",
"The system automatically handles message validation, applies fixes where possible, and routes messages to appropriate queues based on their fixability.",
"It has built-in error handling and detailed logging, it provides a robust framework for message processing that can be easily extended for specific business needs.",
"This pattern uses AWS Lambda for processing, multiple SQS queues for message routing, and includes 2 dead-letter queue (DLQ) for messages requiring human intervention or for auto-remediation."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/serverless-messaging-processing",
"templateURL": "serverless-patterns/serverless-messaging-processing",
"projectFolder": "serverless-messaging-processing",
"templateFile": "template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "Amazon SQS Docs",
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html"
},
{
"text": "Using dead-letter queues in Amazon SQS",
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html"
}
]
},
"deploy": {
"text": [
"sam build",
"sam deploy --guided"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Ilias Ali",
"image": "link-to-your-photo.jpg",
"bio": "I am a Solutions Architect working at AWS based in the UK.",
"linkedin": "ilias-ali-0849991a4"
}
]
}
143 changes: 84 additions & 59 deletions serverless-message-processing/functions/decision_maker/app.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't see where you were making use of the original failure message from the main queue to decide which branching logic to take. Using your "email fix" as an example.

  1. Invalid email address provided in payload to API GW
  2. Message stored durably in SQS main queue
  3. Lambda function polls the queue and attempts to use that malformed email address to some imagined down-stream service.
  4. Downstream service returns an error due to malformed email.
  5. Your main Lambda poller would send a response to SQS for that message and give a failure reason (shared enum of failures maybe), in this example "MALFORMED_EMAIL".
  6. When your decision maker picks that up, I presumed that it would switch through those failure reasons and apply the relevant fix.

For this example it's fine to just use the email scenario as 1 example with the view that the enum can be extended and more failure scenarios might be encountered.

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import re
import boto3
import os
import logging
Expand All @@ -14,109 +15,133 @@
MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL']
FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL']

def can_fix_message(message):
# Email validation pattern
EMAIL_PATTERN = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

def fix_email(email):
"""
Determine if a message can be fixed automatically.

Extension points:
1. Add validation for specific message formats
2. Implement business-specific fix rules
3. Add data transformation logic
4. Implement retry strategies
5. Add validation against external systems
Attempt to fix common email format issues
Can be amended to other scenarios e.g. Downstream issues
"""
try:
# Basic message validation
# Add your validation logic here
return True
except Exception as e:
logger.error(f"Validation error: {str(e)}")
return False
# Remove multiple @ symbols, keep the last one
if email.count('@') > 1:
parts = email.split('@')
email = f"{parts[0]}@{parts[-1]}"

# Remove spaces
email = email.strip().replace(' ', '')

# Fix common typos in domain extensions
common_fixes = {
'.con': '.com',
'.vom': '.com',
'.comm': '.com',
'.orgg': '.org',
'.nett': '.net'
}

for wrong, right in common_fixes.items():
if email.endswith(wrong):
email = email[:-len(wrong)] + right

return email

def fix_message(message):
def can_fix_email(message):
"""
Apply fixes to the message.

Extension points:
1. Add data normalization
2. Implement field-specific fixes
3. Add data enrichment
4. Implement format conversion
5. Add validation rules
Check if the email in the message can be fixed
"""
try:
fixed_message = message.copy()
# Add your fix logic here
fixed_message['wasFixed'] = True
return fixed_message
except Exception as e:
logger.error(f"Fix error: {str(e)}")
return None
if 'email' not in message:
return False

email = message['email']
fixed_email = fix_email(email)

return bool(re.match(EMAIL_PATTERN, fixed_email))


def lambda_handler(event, context):
"""
Process messages and route them based on fixability.

Processes messages from a DLQ that have already failed to be automatically processed,
and attempts automated remediation and redelivery of the messages back to the main queue.
If no suitable fixes can be applied, messages end up in a fatal DLQ where the typical
approach of human intervention is required.

Flow:
1. Attempt to fix message
2. If fixable -> Main Queue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't always true is it? If a message needs to be held for a period before being retried this function would deliver the message to a "delay queue" and ends its responsibility there. The delay queue is then responsible for delivering the message back to the main queue.

3. If unfixable -> Fatal DLQ

Extension points:
1. Add more sophisticated routing logic
1. Add more sophisticated routing logic- including a delay queue
2. Implement custom error handling
3. Add message transformation
4. Implement retry mechanisms
5. Add monitoring and metrics
"""

"""
processed_count = 0

for record in event['Records']:
message_id = 'unknown' # Initialize message_id with default value

try:
# Parse the message body
message = json.loads(record['body'])
message_id = record.get('messageId', 'unknown')
original_message_id = record.get('messageId', 'unknown')

logger.info(f"Processing message: {message_id}")
logger.info(f"Processing failed message: {original_message_id}")

if can_fix_message(message):
fixed_message = fix_message(message)
if fixed_message:
# Send to main queue
sqs.send_message(
QueueUrl=MAIN_QUEUE_URL,
MessageBody=json.dumps(fixed_message)
)
logger.info(f"Fixed message sent to main queue: {message_id}")
else:
raise ValueError("Message fix failed")



# Option A: Try to fix malformed email

if can_fix_email(message) and not re.match(EMAIL_PATTERN, message['email']):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your function can_fix_email() seems to itself make a call (line 57) to fix_email() and then you call the same function again on the next line (line 98). Why does it need to be called twice? Seems like you have functions with mixed responsibilities.

fixed_email = fix_email(message['email'])
logger.info(f"Fixed email from '{message['email']}' to '{fixed_email}'")

# Update the message with fixed email
message['email'] = fixed_email
message['emailWasFixed'] = True

# Send back to main queue
sqs.send_message(
QueueUrl=MAIN_QUEUE_URL,
MessageBody=json.dumps(message)
)

logger.info(f"Sent fixed message back to main queue: {original_message_id}")

# Option B: Cannot fix - send to fatal DLQ
else:
logger.warning(f"Message cannot be fixed, sending to fatal DLQ: {original_message_id}")

# Add failure reason if not present
if 'failureReason' not in message:
message['failureReason'] = 'Unrecoverable error - could not fix message'

# Send to fatal DLQ
message['failureReason'] = 'Message cannot be automatically fixed'
sqs.send_message(
QueueUrl=FATAL_DLQ_URL,
MessageBody=json.dumps(message)
)
logger.warning(f"Message sent to fatal DLQ: {message_id}")


processed_count += 1

except Exception as e:
logger.error(f"Error processing message {message_id}: {str(e)}")
logger.error(f"Error processing message {original_message_id}: {str(e)}")
# If we can't process the decision, send to fatal DLQ
try:
error_message = {
'originalMessage': record['body'],
'failureReason': str(e),
'failureReason': f"Decision maker error: {str(e)}",
'timestamp': context.invoked_function_arn
}
sqs.send_message(
QueueUrl=FATAL_DLQ_URL,
MessageBody=json.dumps(error_message)
)
logger.error(f"Error message sent to fatal DLQ: {message_id}")

except Exception as fatal_e:
logger.critical(f"Fatal DLQ error: {str(fatal_e)}")
logger.critical(f"Could not send to fatal DLQ: {str(fatal_e)}")
raise

return {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
boto3==1.26.137
jsonschema==4.17.3
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
boto3==1.26.137
jsonschema==4.17.3
Loading