Skip to content

Commit

Permalink
Merge pull request #2020 from anaghlal/anaghlal-feature-lambda-eventb…
Browse files Browse the repository at this point in the history
…ridge-sqs-lambda

New serverless pattern : Lambda event generation, their distribution via EventBridge rules to rate controlled processing using SQS+Lambda
  • Loading branch information
ellisms authored Jan 11, 2024
2 parents b580d42 + 3bed635 commit c83bd7f
Show file tree
Hide file tree
Showing 9 changed files with 616 additions and 0 deletions.
81 changes: 81 additions & 0 deletions lambda-eventbridge-sqs-lambda-cdk-python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Event generation, their distribution via Amazon EventBridge rules to rate controlled processing using Amazon SQS and AWS Lambda

This pattern contains a sample AWS Cloud Development Kit (AWS CDK) template for creating a Lambda function that posts AWS EventBridge events to the default domain bus. This CDK template defines EventBridge rules to distribute events based on rules to SQS queues and deploys a AWS Lambda function linked to each queue for event-type specific processing.

Learn more about this pattern at Serverless Land Patterns:: https://serverlessland.com/patterns/lambda-eventbridge-sqs-lambda-cdk-python

Important: This template uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

- [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
- [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
- [AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) installed and configured

## Setup


## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```bash
git clone https://github.com/aws-samples/serverless-patterns
```

1.
```
cd lambda-eventbridge-sqs-lambda-cdk-python
```

Install the required dependencies (aws-cdk-lib and constructs) into your Python environment
```
python3 -m venv v-env
source v-env/bin/activate
pip install -r requirements.txt
```

1. From the command line, configure AWS CDK:
```bash
cdk bootstrap ACCOUNT-NUMBER/REGION # e.g.
cdk bootstrap 1111111111/us-east-1
cdk bootstrap --profile test 1111111111/us-east-1
```
1. From the command line, use AWS CDK to deploy the AWS resources for the pattern.
```bash
cdk deploy
```


## How it works

- The eventsGenerator lambda function publishes three events to the default bus. Each event has distinct id and distribution-mechanism-preference pair. The events are also stamped with source as "content-generator"
- Each message is evaluated against the rules defined in EventBridge. We have three rules each with a matching criteria of source matching "content-generator" and preferenceDistribution matching email|sftp|3papi respectively. (3papi refers to third-party API.)
- The rules are configured with targets as SQS queues that trigger Lambdas for processing. The SQS queues are configured with concurrency settings to match the rate of the distribution-channel. See details of the rate limiting feature in the docs [SQS Max Concurrency] (https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency)


## Testing

1. Open the AWS Lambda Console and locate the Lambda function printed in the stack output.
1. Create a new test event, accepting the default Event JSON. Choose **Invoke**.

1. This will send three events to EventBridge and each will be routed to either the email/sftp/3papi SQS queue. The consumer Lambda function on each queue will process the message. For each of the Lambda's you can check the CloudWatch logs to see the message received and confirm it matches one of email|sftp|3papi.



## Cleanup

1. Delete the stack
```bash
cdk destroy
```


## Documentation and useful references
- [Reducing custom code by using advanced rules in Amazon EventBridge](https://aws.amazon.com/blogs/compute/reducing-custom-code-by-using-advanced-rules-in-amazon-eventbridge/)
- [Use Amazon EventBridge to Build Decoupled, Event-Driven Architectures](https://serverlessland.com/learn/eventbridge/)

----
Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
138 changes: 138 additions & 0 deletions lambda-eventbridge-sqs-lambda-cdk-python/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#!/usr/bin/env python3
import os
import aws_cdk as cdk
import json

from aws_cdk import (
Duration,
Stack,
aws_iam as iam,
aws_lambda as lambda_,
aws_lambda_event_sources as event_sources,
aws_sqs as sqs_,
aws_events as events,
aws_events_targets as targets,
aws_logs as logs,
aws_iam as iam,
CfnOutput,
RemovalPolicy
)
from aws_solutions_constructs.aws_sqs_lambda import SqsToLambda
from constructs import Construct

DIRNAME = os.path.dirname(__file__)


class LambdaEventBridgeSQSLambda(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)


# Iam role to invoke lambda
lambda_cfn_role = iam.Role(
self,
"CfnRole",
assumed_by=iam.ServicePrincipal("events.amazonaws.com"),
)
lambda_cfn_role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name("AWSLambdaExecute")
)
lambdaLogGroup = logs.LogGroup(self,"EventGeneratorLogGroup",removal_policy=RemovalPolicy.DESTROY)
# Lambda function that generates and publishes events to the default EventBridge bus
lambda_function = lambda_.Function(
self,
"EventGenerator",
runtime=lambda_.Runtime.PYTHON_3_11,
handler="eventsGenerator.lambda_handler",
code=lambda_.Code.from_asset(os.path.join(DIRNAME, "src")),
timeout=Duration.minutes(1),
memory_size=512,
environment={
"environment": "dev",
},
log_group = lambdaLogGroup


)


# lambda version
version = lambda_function.current_version

# lambda policy
lambda_function.add_to_role_policy(
iam.PolicyStatement(
actions=[
"events:PutEvents",
],
resources=["*"],
)
)


# SQS queue
email_queue = sqs_.Queue(self, "EmailQueue")
sftp_queue = sqs_.Queue(self, "SftpQueue")
tpapi_queue = sqs_.Queue(self, "TpapiQueue")
dlq = sqs_.Queue(self, "DLQ")


# EventBridge Rule

email_rule = self.create_rule(email_queue,'email',dlq)
sftp_rule = self.create_rule(sftp_queue,'sftp',dlq)
tpapi_rule= self.create_rule(tpapi_queue,'3papi',dlq)


# Grant send messages to EventBridge
email_queue.grant_send_messages(iam.ServicePrincipal("events.amazonaws.com"))
sftp_queue.grant_send_messages(iam.ServicePrincipal("events.amazonaws.com"))
tpapi_queue.grant_send_messages(iam.ServicePrincipal("events.amazonaws.com"))
dlq.grant_send_messages(iam.ServicePrincipal("events.amazonaws.com"))

email_integration = self.sqs_to_lambda_integration(email_queue,'Email',5)
sftp_integration = self.sqs_to_lambda_integration(sftp_queue,'Sftp',2)
tpapi_integration = self.sqs_to_lambda_integration(tpapi_queue,'Tpapi',3)

# Outputs
CfnOutput(
self,
"Lambda function",
description="Lambda function",
value=lambda_function.function_arn
)

def sqs_to_lambda_integration(self, queue, preference,input_rate):
return SqsToLambda(self, preference+'OutboundProcessor',
lambda_function_props=lambda_.FunctionProps(
handler="eventProcessor.lambda_handler",
code=lambda_.Code.from_asset(os.path.join(DIRNAME, "src")),
runtime=lambda_.Runtime.PYTHON_3_11,
function_name=preference+'-processor',
),
existing_queue_obj= queue,
sqs_event_source_props=event_sources.SqsEventSourceProps(
batch_size=1,
max_concurrency = input_rate
)
)

def create_rule(self, queue, preference,dlq):
eventPattern = {
"source": [{
"equals-ignore-case": "content-generator"
}],
"detail.preferenceDistribution": [{
"equals-ignore-case": preference
}]
}
dlq_property = events.CfnRule.DeadLetterConfigProperty(arn=dlq.queue_arn)
target_queue = events.CfnRule.TargetProperty(arn= queue.queue_arn,id = preference+'-queue-target', dead_letter_config=dlq_property)
return events.CfnRule(self,preference+"-rule",event_pattern=eventPattern, targets=[target_queue])



app = cdk.App()
filestack = LambdaEventBridgeSQSLambda(app, "LambdaEventBridgeSQSLambda")

app.synth()
51 changes: 51 additions & 0 deletions lambda-eventbridge-sqs-lambda-cdk-python/cdk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"app": "python app.py",
"watch": {
"include": [
"**"
],
"exclude": [
"README.md",
"cdk*.json",
"requirements*.txt",
"source.bat",
"**/__init__.py",
"python/__pycache__",
"tests"
]
},
"context": {
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
"@aws-cdk/core:checkSecretUsage": true,
"@aws-cdk/core:target-partitions": [
"aws",
"aws-cn"
],
"@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true,
"@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true,
"@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true,
"@aws-cdk/aws-iam:minimizePolicies": true,
"@aws-cdk/core:validateSnapshotRemovalPolicy": true,
"@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true,
"@aws-cdk/aws-s3:createDefaultLoggingPolicy": true,
"@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true,
"@aws-cdk/aws-apigateway:disableCloudWatchRole": true,
"@aws-cdk/core:enablePartitionLiterals": true,
"@aws-cdk/aws-events:eventsTargetQueueSameAccount": true,
"@aws-cdk/aws-iam:standardizedServicePrincipals": true,
"@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true,
"@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true,
"@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true,
"@aws-cdk/aws-route53-patters:useCertificate": true,
"@aws-cdk/customresources:installLatestAwsSdkDefault": false,
"@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true,
"@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true,
"@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true,
"@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true,
"@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true,
"@aws-cdk/aws-redshift:columnId": true,
"@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true,
"@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true,
"@aws-cdk/aws-apigateway:requestValidatorUniqueId": true
}
}
Loading

0 comments on commit c83bd7f

Please sign in to comment.