From 5048d729ff704cf890c70967ea0d63d4d1ea8c11 Mon Sep 17 00:00:00 2001 From: ilias ali Date: Mon, 16 Dec 2024 16:34:43 +0000 Subject: [PATCH 1/2] New serverless pattern - Serverless Messaging Redrive --- .../functions/decision_maker/app.py | 127 +++++++++++++ .../functions/decision_maker/requirements.txt | 2 + .../functions/processor/app.py | 121 ++++++++++++ .../functions/processor/requirements.txt | 2 + serverless-message-processing/template.yaml | 175 ++++++++++++++++++ 5 files changed, 427 insertions(+) create mode 100644 serverless-message-processing/functions/decision_maker/app.py create mode 100644 serverless-message-processing/functions/decision_maker/requirements.txt create mode 100644 serverless-message-processing/functions/processor/app.py create mode 100644 serverless-message-processing/functions/processor/requirements.txt create mode 100644 serverless-message-processing/template.yaml diff --git a/serverless-message-processing/functions/decision_maker/app.py b/serverless-message-processing/functions/decision_maker/app.py new file mode 100644 index 000000000..a0e06d4fa --- /dev/null +++ b/serverless-message-processing/functions/decision_maker/app.py @@ -0,0 +1,127 @@ +import json +import boto3 +import os +import logging + +# Set up logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +# Initialize AWS clients +sqs = boto3.client('sqs') + +# Environment variables +MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL'] +FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL'] + +def can_fix_message(message): + """ + Determine if a message can be fixed automatically. + + Extension points: + 1. Add validation for specific message formats + 2. Implement business-specific fix rules + 3. Add data transformation logic + 4. Implement retry strategies + 5. Add validation against external systems + """ + try: + # Basic message validation + # Add your validation logic here + return True + except Exception as e: + logger.error(f"Validation error: {str(e)}") + return False + +def fix_message(message): + """ + Apply fixes to the message. + + Extension points: + 1. Add data normalization + 2. Implement field-specific fixes + 3. Add data enrichment + 4. Implement format conversion + 5. Add validation rules + """ + try: + fixed_message = message.copy() + # Add your fix logic here + fixed_message['wasFixed'] = True + return fixed_message + except Exception as e: + logger.error(f"Fix error: {str(e)}") + return None + +def lambda_handler(event, context): + """ + Process messages and route them based on fixability. + + Flow: + 1. Attempt to fix message + 2. If fixable -> Main Queue + 3. If unfixable -> Fatal DLQ + + Extension points: + 1. Add more sophisticated routing logic + 2. Implement custom error handling + 3. Add message transformation + 4. Implement retry mechanisms + 5. Add monitoring and metrics + """ + processed_count = 0 + + for record in event['Records']: + message_id = 'unknown' # Initialize message_id with default value + + try: + message = json.loads(record['body']) + message_id = record.get('messageId', 'unknown') + + logger.info(f"Processing message: {message_id}") + + if can_fix_message(message): + fixed_message = fix_message(message) + if fixed_message: + # Send to main queue + sqs.send_message( + QueueUrl=MAIN_QUEUE_URL, + MessageBody=json.dumps(fixed_message) + ) + logger.info(f"Fixed message sent to main queue: {message_id}") + else: + raise ValueError("Message fix failed") + else: + # Send to fatal DLQ + message['failureReason'] = 'Message cannot be automatically fixed' + sqs.send_message( + QueueUrl=FATAL_DLQ_URL, + MessageBody=json.dumps(message) + ) + logger.warning(f"Message sent to fatal DLQ: {message_id}") + + processed_count += 1 + + except Exception as e: + logger.error(f"Error processing message {message_id}: {str(e)}") + try: + error_message = { + 'originalMessage': record['body'], + 'failureReason': str(e), + 'timestamp': context.invoked_function_arn + } + sqs.send_message( + QueueUrl=FATAL_DLQ_URL, + MessageBody=json.dumps(error_message) + ) + logger.error(f"Error message sent to fatal DLQ: {message_id}") + except Exception as fatal_e: + logger.critical(f"Fatal DLQ error: {str(fatal_e)}") + raise + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'processedMessages': processed_count + }) + } diff --git a/serverless-message-processing/functions/decision_maker/requirements.txt b/serverless-message-processing/functions/decision_maker/requirements.txt new file mode 100644 index 000000000..9a3e84f98 --- /dev/null +++ b/serverless-message-processing/functions/decision_maker/requirements.txt @@ -0,0 +1,2 @@ +boto3==1.26.137 +jsonschema==4.17.3 diff --git a/serverless-message-processing/functions/processor/app.py b/serverless-message-processing/functions/processor/app.py new file mode 100644 index 000000000..d4f744065 --- /dev/null +++ b/serverless-message-processing/functions/processor/app.py @@ -0,0 +1,121 @@ +import json +import logging + +# Set up logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def validate_message_structure(message): + """ + Validate message structure and required fields. + Args: + message: Dictionary containing message data + Returns: + bool: True if valid message structure, False otherwise + """ + required_fields = ['messageType', 'payload', 'timestamp'] + return all(field in message for field in required_fields) + +def process_message(message): + """ + Process the message content. + Args: + message: Dictionary containing message data + Returns: + bool: True if processing successful, False otherwise + """ + try: + # Validate message structure + if not validate_message_structure(message): + logger.error("Message missing required fields") + raise ValueError("Invalid message structure") + + message_type = message['messageType'] + payload = message['payload'] + + # Validate message type + valid_types = ['TYPE_A', 'TYPE_B', 'TYPE_C'] + if message_type not in valid_types: + logger.error(f"Invalid message type: {message_type}") + raise ValueError(f"Invalid message type: {message_type}") + + # Check for downstream system status + if 'systemStatus' in message and message['systemStatus'].lower() == 'unavailable': + logger.error("Target system is unavailable") + raise ValueError("DOWNSTREAM_ERROR: Target system unavailable") + + # Process the message based on type + logger.info(f"Processing message type: {message_type}") + + # Add type-specific processing logic here + if message_type == 'TYPE_A': + # Process TYPE_A messages + pass + elif message_type == 'TYPE_B': + # Process TYPE_B messages + pass + elif message_type == 'TYPE_C': + # Process TYPE_C messages + pass + + return True + + except Exception as e: + logger.error(f"Error processing message: {str(e)}") + raise + +def lambda_handler(event, context): + """ + Main Lambda handler function. + Args: + event: Lambda event object + context: Lambda context object + Returns: + dict: Response object + """ + logger.info(f"Processing {len(event['Records'])} messages") + + processed_count = 0 + failed_count = 0 + downstream_errors = 0 + + for record in event['Records']: + try: + # Parse the message body + message = json.loads(record['body']) + + # Process the message + if process_message(message): + processed_count += 1 + logger.info(f"Successfully processed message: {message.get('messageId', 'unknown')}") + else: + failed_count += 1 + logger.warning(f"Message processing returned False: {message.get('messageId', 'unknown')}") + + except json.JSONDecodeError as e: + failed_count += 1 + logger.error(f"Invalid JSON in message: {str(e)}") + raise + + except ValueError as e: + if "DOWNSTREAM_ERROR" in str(e): + downstream_errors += 1 + logger.error("Downstream error detected") + raise + failed_count += 1 + logger.error(f"Validation error: {str(e)}") + raise + + except Exception as e: + failed_count += 1 + logger.error(f"Unexpected error processing message: {str(e)}") + raise + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'processed': processed_count, + 'failed': failed_count, + 'downstream_errors': downstream_errors + }) + } diff --git a/serverless-message-processing/functions/processor/requirements.txt b/serverless-message-processing/functions/processor/requirements.txt new file mode 100644 index 000000000..9a3e84f98 --- /dev/null +++ b/serverless-message-processing/functions/processor/requirements.txt @@ -0,0 +1,2 @@ +boto3==1.26.137 +jsonschema==4.17.3 diff --git a/serverless-message-processing/template.yaml b/serverless-message-processing/template.yaml new file mode 100644 index 000000000..db640dafc --- /dev/null +++ b/serverless-message-processing/template.yaml @@ -0,0 +1,175 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: Serverless message processing architecture + +Resources: + ApiGateway: + Type: AWS::Serverless::Api + Properties: + StageName: prod + DefinitionBody: + swagger: '2.0' + info: + title: SQS API + version: '1.0' + schemes: + - https + paths: + /message: + post: + consumes: + - application/json + produces: + - application/json + responses: + '200': + description: OK + schema: + type: object + x-amazon-apigateway-integration: + credentials: !GetAtt ApiGatewayRole.Arn + type: aws + uri: !Sub "arn:aws:apigateway:${AWS::Region}:sqs:path/${AWS::AccountId}/${MainQueue.QueueName}" + httpMethod: POST + responses: + default: + statusCode: "200" + requestParameters: + integration.request.header.Content-Type: "'application/x-www-form-urlencoded'" + requestTemplates: + application/json: | + Action=SendMessage&MessageBody=$input.body + passthroughBehavior: never + integrationResponses: + '200': + statusCode: '200' + responseTemplates: + application/json: | + { + "message": "Message sent successfully", + "messageId": "$input.path('$.SendMessageResponse.SendMessageResult.MessageId')" + } + + ApiGatewayRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: apigateway.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: ApiGatewaySQSPolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: sqs:SendMessage + Resource: !GetAtt MainQueue.Arn + + MainQueue: + Type: AWS::SQS::Queue + Properties: + RedrivePolicy: + deadLetterTargetArn: !GetAtt AutomatedDLQ.Arn + maxReceiveCount: 3 + VisibilityTimeout: 30 + MessageRetentionPeriod: 345600 + ReceiveMessageWaitTimeSeconds: 20 + + AutomatedDLQ: + Type: AWS::SQS::Queue + Properties: + MessageRetentionPeriod: 345600 + + + FatalDLQ: + Type: AWS::SQS::Queue + Properties: + MessageRetentionPeriod: 1209600 + + ProcessMessages: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + Runtime: python3.9 + CodeUri: ./functions/processor/ + MemorySize: 128 + Timeout: 29 + Events: + SQSEvent: + Type: SQS + Properties: + Queue: !GetAtt MainQueue.Arn + BatchSize: 10 + Environment: + Variables: + MAIN_QUEUE_URL: !Ref MainQueue + FATAL_DLQ_URL: !Ref FatalDLQ + + DecisionMaker: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + Runtime: python3.9 + CodeUri: ./functions/decision_maker/ + MemorySize: 128 + Timeout: 29 + Events: + SQSEvent: + Type: SQS + Properties: + Queue: !GetAtt AutomatedDLQ.Arn + BatchSize: 10 + Policies: + - SQSPollerPolicy: + QueueName: !GetAtt MainQueue.QueueName + - SQSPollerPolicy: + QueueName: !GetAtt FatalDLQ.QueueName + - SQSSendMessagePolicy: + QueueName: !GetAtt FatalDLQ.QueueName + - SQSSendMessagePolicy: + QueueName: !GetAtt MainQueue.QueueName + Environment: + Variables: + MAIN_QUEUE_URL: !Ref MainQueue + FATAL_DLQ_URL: !Ref FatalDLQ + + + # CloudWatch Log Groups + ProcessMessagesLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${ProcessMessages}" + RetentionInDays: 14 + + DecisionMakerLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${DecisionMaker}" + RetentionInDays: 14 + + + +Outputs: + ApiEndpoint: + Description: API Gateway endpoint URL + Value: !Sub "https://${ApiGateway}.execute-api.${AWS::Region}.amazonaws.com/prod/message" + + MainQueueUrl: + Description: URL of the main SQS queue + Value: !Ref MainQueue + + MainQueueArn: + Description: ARN of the main SQS queue + Value: !GetAtt MainQueue.Arn + + AutomatedDLQUrl: + Description: URL of the automated DLQ + Value: !Ref AutomatedDLQ + + FatalDLQUrl: + Description: URL of the fatal DLQ + Value: !Ref FatalDLQ From 49e0acfc2d765fb79ab49f55f775869cf188dc7b Mon Sep 17 00:00:00 2001 From: ilias ali Date: Fri, 3 Jan 2025 16:54:14 +0000 Subject: [PATCH 2/2] New serverless pattern - Serverless Messaging Redrive- added improvements --- serverless-message-processing/README.md | 58 +++++++ .../example-pattern.json | 60 ++++++++ .../functions/decision_maker/app.py | 143 ++++++++++-------- .../functions/decision_maker/requirements.txt | 1 - .../functions/processor/requirements.txt | 1 - serverless-message-processing/template.yaml | 72 ++++++++- 6 files changed, 270 insertions(+), 65 deletions(-) create mode 100644 serverless-message-processing/README.md create mode 100644 serverless-message-processing/example-pattern.json diff --git a/serverless-message-processing/README.md b/serverless-message-processing/README.md new file mode 100644 index 000000000..4131b4360 --- /dev/null +++ b/serverless-message-processing/README.md @@ -0,0 +1,58 @@ +# Serverless Message Processing Pattern + +## Overview +An adaptable pattern for message processing using AWS serverless services, featuring error handling and automatic recovery mechanisms. + +## Core Components +- API Gateway (message ingestion) +- SQS Queues (main + DLQs) +- Lambda Functions (processing + recovery) + +## Basic Flow +1. Messages enter through API Gateway +2. Main queue receives messages +3. Processor Lambda handles messages +4. Failed messages route to DLQs +5. Decision maker attempts an automated recovery + +## Deployment +# Build the SAM application + ``` +sam build + ``` +# Deploy the application + ``` +sam deploy --guided + ``` + +## Key Features +- Automatic retry mechanism +- Segregation of recoverable/fatal errors +- Extensible processing logic + +## API Reference +# Send Message + ``` + +POST /message +Content-Type: application/json + ``` + ``` +{ + "messageType": "TYPE_A|TYPE_B|TYPE_C", + "payload": {}, + "timestamp": "ISO8601_TIMESTAMP" +} + ``` + + +## Adaptation Points +- Message validation rules +- Processing logic +- Error handling strategies +- Recovery mechanisms +- Monitoring requirements +- API Design + +## Note +This is a sample pattern. Adapt security, scaling, and processing logic according to your requirements. \ No newline at end of file diff --git a/serverless-message-processing/example-pattern.json b/serverless-message-processing/example-pattern.json new file mode 100644 index 000000000..00075d039 --- /dev/null +++ b/serverless-message-processing/example-pattern.json @@ -0,0 +1,60 @@ +{ + "title": "Step Functions to Athena", + "description": "Create a Step Functions workflow to query Amazon Athena.", + "language": "Python", + "level": "200", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This sample project demonstrates how to use a serverless solution for processing and fixing malformed messages using SQS queues and Lambda functions", + "The system automatically handles message validation, applies fixes where possible, and routes messages to appropriate queues based on their fixability.", + "It has built-in error handling and detailed logging, it provides a robust framework for message processing that can be easily extended for specific business needs.", + "This pattern uses AWS Lambda for processing, multiple SQS queues for message routing, and includes 2 dead-letter queue (DLQ) for messages requiring human intervention or for auto-remediation." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/serverless-messaging-processing", + "templateURL": "serverless-patterns/serverless-messaging-processing", + "projectFolder": "serverless-messaging-processing", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon SQS Docs", + "link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html" + }, + { + "text": "Using dead-letter queues in Amazon SQS", + "link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Ilias Ali", + "image": "link-to-your-photo.jpg", + "bio": "I am a Solutions Architect working at AWS based in the UK.", + "linkedin": "ilias-ali-0849991a4" + } + ] + } \ No newline at end of file diff --git a/serverless-message-processing/functions/decision_maker/app.py b/serverless-message-processing/functions/decision_maker/app.py index a0e06d4fa..12539b221 100644 --- a/serverless-message-processing/functions/decision_maker/app.py +++ b/serverless-message-processing/functions/decision_maker/app.py @@ -1,4 +1,5 @@ import json +import re import boto3 import os import logging @@ -14,109 +15,133 @@ MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL'] FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL'] -def can_fix_message(message): +# Email validation pattern +EMAIL_PATTERN = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + +def fix_email(email): """ - Determine if a message can be fixed automatically. - - Extension points: - 1. Add validation for specific message formats - 2. Implement business-specific fix rules - 3. Add data transformation logic - 4. Implement retry strategies - 5. Add validation against external systems + Attempt to fix common email format issues + Can be amended to other scenarios e.g. Downstream issues """ - try: - # Basic message validation - # Add your validation logic here - return True - except Exception as e: - logger.error(f"Validation error: {str(e)}") - return False + # Remove multiple @ symbols, keep the last one + if email.count('@') > 1: + parts = email.split('@') + email = f"{parts[0]}@{parts[-1]}" + + # Remove spaces + email = email.strip().replace(' ', '') + + # Fix common typos in domain extensions + common_fixes = { + '.con': '.com', + '.vom': '.com', + '.comm': '.com', + '.orgg': '.org', + '.nett': '.net' + } + + for wrong, right in common_fixes.items(): + if email.endswith(wrong): + email = email[:-len(wrong)] + right + + return email -def fix_message(message): +def can_fix_email(message): """ - Apply fixes to the message. - - Extension points: - 1. Add data normalization - 2. Implement field-specific fixes - 3. Add data enrichment - 4. Implement format conversion - 5. Add validation rules + Check if the email in the message can be fixed """ - try: - fixed_message = message.copy() - # Add your fix logic here - fixed_message['wasFixed'] = True - return fixed_message - except Exception as e: - logger.error(f"Fix error: {str(e)}") - return None + if 'email' not in message: + return False + + email = message['email'] + fixed_email = fix_email(email) + + return bool(re.match(EMAIL_PATTERN, fixed_email)) + def lambda_handler(event, context): """ - Process messages and route them based on fixability. - + Processes messages from a DLQ that have already failed to be automatically processed, + and attempts automated remediation and redelivery of the messages back to the main queue. + If no suitable fixes can be applied, messages end up in a fatal DLQ where the typical + approach of human intervention is required. + Flow: 1. Attempt to fix message 2. If fixable -> Main Queue 3. If unfixable -> Fatal DLQ Extension points: - 1. Add more sophisticated routing logic + 1. Add more sophisticated routing logic- including a delay queue 2. Implement custom error handling 3. Add message transformation 4. Implement retry mechanisms 5. Add monitoring and metrics - """ + + """ processed_count = 0 for record in event['Records']: - message_id = 'unknown' # Initialize message_id with default value - try: + # Parse the message body message = json.loads(record['body']) - message_id = record.get('messageId', 'unknown') + original_message_id = record.get('messageId', 'unknown') - logger.info(f"Processing message: {message_id}") + logger.info(f"Processing failed message: {original_message_id}") - if can_fix_message(message): - fixed_message = fix_message(message) - if fixed_message: - # Send to main queue - sqs.send_message( - QueueUrl=MAIN_QUEUE_URL, - MessageBody=json.dumps(fixed_message) - ) - logger.info(f"Fixed message sent to main queue: {message_id}") - else: - raise ValueError("Message fix failed") + + + + # Option A: Try to fix malformed email + + if can_fix_email(message) and not re.match(EMAIL_PATTERN, message['email']): + fixed_email = fix_email(message['email']) + logger.info(f"Fixed email from '{message['email']}' to '{fixed_email}'") + + # Update the message with fixed email + message['email'] = fixed_email + message['emailWasFixed'] = True + + # Send back to main queue + sqs.send_message( + QueueUrl=MAIN_QUEUE_URL, + MessageBody=json.dumps(message) + ) + + logger.info(f"Sent fixed message back to main queue: {original_message_id}") + + # Option B: Cannot fix - send to fatal DLQ else: + logger.warning(f"Message cannot be fixed, sending to fatal DLQ: {original_message_id}") + + # Add failure reason if not present + if 'failureReason' not in message: + message['failureReason'] = 'Unrecoverable error - could not fix message' + # Send to fatal DLQ - message['failureReason'] = 'Message cannot be automatically fixed' sqs.send_message( QueueUrl=FATAL_DLQ_URL, MessageBody=json.dumps(message) ) - logger.warning(f"Message sent to fatal DLQ: {message_id}") - + processed_count += 1 except Exception as e: - logger.error(f"Error processing message {message_id}: {str(e)}") + logger.error(f"Error processing message {original_message_id}: {str(e)}") + # If we can't process the decision, send to fatal DLQ try: error_message = { 'originalMessage': record['body'], - 'failureReason': str(e), + 'failureReason': f"Decision maker error: {str(e)}", 'timestamp': context.invoked_function_arn } sqs.send_message( QueueUrl=FATAL_DLQ_URL, MessageBody=json.dumps(error_message) ) - logger.error(f"Error message sent to fatal DLQ: {message_id}") + except Exception as fatal_e: - logger.critical(f"Fatal DLQ error: {str(fatal_e)}") + logger.critical(f"Could not send to fatal DLQ: {str(fatal_e)}") raise return { diff --git a/serverless-message-processing/functions/decision_maker/requirements.txt b/serverless-message-processing/functions/decision_maker/requirements.txt index 9a3e84f98..08939dbfa 100644 --- a/serverless-message-processing/functions/decision_maker/requirements.txt +++ b/serverless-message-processing/functions/decision_maker/requirements.txt @@ -1,2 +1 @@ boto3==1.26.137 -jsonschema==4.17.3 diff --git a/serverless-message-processing/functions/processor/requirements.txt b/serverless-message-processing/functions/processor/requirements.txt index 9a3e84f98..08939dbfa 100644 --- a/serverless-message-processing/functions/processor/requirements.txt +++ b/serverless-message-processing/functions/processor/requirements.txt @@ -1,2 +1 @@ boto3==1.26.137 -jsonschema==4.17.3 diff --git a/serverless-message-processing/template.yaml b/serverless-message-processing/template.yaml index db640dafc..8b75233bc 100644 --- a/serverless-message-processing/template.yaml +++ b/serverless-message-processing/template.yaml @@ -2,6 +2,68 @@ AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Serverless message processing architecture +Parameters: + ProcessorMemorySize: + Type: Number + Default: 128 + Description: | + Memory allocation for the ProcessMessages Lambda function (MB). + Consider increasing this value if: + - Processing complex/large messages + - Performing CPU-intensive operations + - Seeing timeout errors or slow processing + - Handling concurrent message processing + Recommended ranges: + - Light processing: 128-256 MB + - Medium processing: 256-512 MB + - Heavy processing: 512-1024 MB + - Very heavy processing: 1024-2048 MB + MinValue: 128 + MaxValue: 10240 + + DecisionMakerMemorySize: + Type: Number + Default: 128 + Description: | + Memory allocation for the DecisionMaker Lambda function (MB). + Consider increasing this value if: + - Complex routing logic is implemented + - Processing message metadata + - Performing extensive error analysis + - Handling multiple concurrent decisions + Recommended ranges: + - Basic routing: 128-256 MB + - Complex routing: 256-512 MB + - Advanced analysis: 512-1024 MB + MinValue: 128 + MaxValue: 10240 + + ProcessorTimeout: + Type: Number + Default: 29 + Description: | + Timeout for the ProcessMessages Lambda function (seconds). + Consider increasing this value if: + - Processing takes longer than expected + - External API calls are involved + - Complex data transformations are needed + Note: Maximum value is 900 seconds (15 minutes) + MinValue: 1 + MaxValue: 900 + + DecisionMakerTimeout: + Type: Number + Default: 29 + Description: | + Timeout for the DecisionMaker Lambda function (seconds). + Consider increasing this value if: + - Complex routing logic requires more time + - Multiple external service checks are needed + - Batch processing of failed messages + Note: Maximum value is 900 seconds (15 minutes) + MinValue: 1 + MaxValue: 900 + Resources: ApiGateway: Type: AWS::Serverless::Api @@ -96,8 +158,9 @@ Resources: Handler: app.lambda_handler Runtime: python3.9 CodeUri: ./functions/processor/ - MemorySize: 128 - Timeout: 29 + MemorySize: !Ref ProcessorMemorySize + Timeout: !Ref ProcessorTimeout + Events: SQSEvent: Type: SQS @@ -115,8 +178,9 @@ Resources: Handler: app.lambda_handler Runtime: python3.9 CodeUri: ./functions/decision_maker/ - MemorySize: 128 - Timeout: 29 + MemorySize: !Ref DecisionMakerMemorySize + Timeout: !Ref DecisionMakerTimeout + Events: SQSEvent: Type: SQS