From 041776eba009ca6e92938592db41d24c23efa583 Mon Sep 17 00:00:00 2001 From: Edward Schaefer Date: Fri, 22 Dec 2023 10:28:17 -0500 Subject: [PATCH 1/6] added Firehose Transformation pattern (CDK + Typescript) --- .../README.md | 72 +++++++++++ .../example-pattern.json | 65 ++++++++++ .../src/.gitignore | 8 ++ .../src/.npmignore | 6 + .../src/README.md | 14 +++ .../src/bin/firehose-lambda-transform.ts | 9 ++ .../src/cdk.json | 63 ++++++++++ .../src/jest.config.js | 8 ++ .../src/lambda/index.ts | 31 +++++ .../src/lib/firehose-lambda-stack.ts | 115 ++++++++++++++++++ .../src/package.json | 28 +++++ .../src/tsconfig.json | 31 +++++ 12 files changed, 450 insertions(+) create mode 100644 firehose-transformation-cdk-typescript/README.md create mode 100644 firehose-transformation-cdk-typescript/example-pattern.json create mode 100644 firehose-transformation-cdk-typescript/src/.gitignore create mode 100644 firehose-transformation-cdk-typescript/src/.npmignore create mode 100644 firehose-transformation-cdk-typescript/src/README.md create mode 100644 firehose-transformation-cdk-typescript/src/bin/firehose-lambda-transform.ts create mode 100644 firehose-transformation-cdk-typescript/src/cdk.json create mode 100644 firehose-transformation-cdk-typescript/src/jest.config.js create mode 100644 firehose-transformation-cdk-typescript/src/lambda/index.ts create mode 100644 firehose-transformation-cdk-typescript/src/lib/firehose-lambda-stack.ts create mode 100644 firehose-transformation-cdk-typescript/src/package.json create mode 100644 firehose-transformation-cdk-typescript/src/tsconfig.json diff --git a/firehose-transformation-cdk-typescript/README.md b/firehose-transformation-cdk-typescript/README.md new file mode 100644 index 000000000..65b675c3b --- /dev/null +++ b/firehose-transformation-cdk-typescript/README.md @@ -0,0 +1,72 @@ +# Amazon Kinesis Data Firehose Data Transformation with AWS Lambda + +This pattern deploys a Kinesis Data Firehose Delivery Stream that invokes a Lambda function to transform incoming source data and delivers the transformed data to a destination Amazon S3 bucket. + +Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/firehose-transformation-cdk-typescript](https://serverlessland.com/patterns/firehose-transformation-cdk-typescript) + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) installed and configured + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd serverless-patterns/firehose-transformation-cdk-typescript/src + ``` +1. Install dependencies: + ```bash + npm install + ``` +1. Bootstrap environment (if you have not done so already) + ``` + cdk bootstrap + ``` +1. Deploy the stack to your default AWS account and region. + ``` + cdk deploy + ``` + +## How it works + +Kinesis Data Firehose can invoke a Lambda function to transform incoming source data and deliver the transformed data to destinations. In this architecture, Kinesis Data Firehose invokes the specified Lambda function asynchronously with each buffered batch using the AWS Lambda synchronous invocation mode. The transformed data is sent from Lambda to Kinesis Data Firehose. Kinesis Data Firehose then sends it to the destination S3 bucket when the specified destination buffering size or buffering interval is reached, whichever happens first. + +## Testing + +1. Open the Kinesis Data Firehose console at https://console.aws.amazon.com/firehose/ + +2. Choose the {stack-name}-firehoststream-{stream-id} delivery stream + +3. Under **Test with demo data**, choose **Start sending demo data** to generate sample stock ticker data. + +4. After a few seconds, choose **Stop sending demo data** + +5. Verify that test events are being sent to the destination S3 bucket. Note that it might take a few minutes for new objects to appear in the bucket, based on the buffering configuration. + +``` +aws s3 ls s3://{destination_bucket_name} --recursive --human-readable --summarize +``` + +Or nagivate to the S3 console and manually verify that the demo data has been sent to S3 + +## Cleanup + +Run the following command to delete the resources + +```bash +cdk destroy +``` + +---- +Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 \ No newline at end of file diff --git a/firehose-transformation-cdk-typescript/example-pattern.json b/firehose-transformation-cdk-typescript/example-pattern.json new file mode 100644 index 000000000..c13da76d4 --- /dev/null +++ b/firehose-transformation-cdk-typescript/example-pattern.json @@ -0,0 +1,65 @@ +{ + "title": "Kinesis Firehose Data Transformation with Lambda", + "description": "Transform incoming source data and deliver the transformed data to destinations.", + "language": "typescript", + "level": "200", + "framework": "CDK", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates how to transform streaming data received by Kinesis Data Firehose using AWS Lambda before delivering the transformed data to Amazon S3.", + "To transform incoming source data, Kinesis Data Firehose invokes the specified Lambda function asynchronously with each buffered batch using the AWS Lambda synchronous invocation mode. The transformed data is sent from Lambda to Kinesis Data Firehose. Kinesis Data Firehose then sends it to the destination S3 bucket when the specified destination buffering size or buffering interval is reached, whichever happens first." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/firehose-transformation-cdk-typescript", + "templateURL": "serverless-patterns/firehose-transformation-cdk-typescript", + "projectFolder": "firehose-transformation-cdk-typescript", + "templateFile": "firehose-transformation-cdk-typescript/src/lib/firehost-lambda-transform.ts" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon Kinesis Data Firehose Data Transformation", + "link": "https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html" + }, + { + "text": "Using AWS Lambda with Amazon Kinesis Data Firehose", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html" + }, + { + "text": "Ingesting enriched IoT data into Amazon S3 using Amazon Kinesis Data Firehose", + "link": "https://aws.amazon.com/blogs/iot/ingesting-enriched-iot-data-into-amazon-s3-using-amazon-kinesis-data-firehose/" + }, + { + "text": "Capture clickstream data using AWS serverless services", + "link": "https://aws.amazon.com/blogs/industries/capture-clickstream-data-using-aws-serverless-services/" + } + ] + }, + "deploy": { + "text": [ + "cdk deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "cdk delete" + ] + }, + "authors": [ + { + "name": "Edward Schaefer", + "image": "https://d2siip5gg18ho0.cloudfront.net/images/schaeedw-photo-centered_250x250.jpg", + "bio": "Solutions Architect @ Amazon Web Services", + "linkedin": "ejschaefer" + } + ] +} diff --git a/firehose-transformation-cdk-typescript/src/.gitignore b/firehose-transformation-cdk-typescript/src/.gitignore new file mode 100644 index 000000000..f60797b6a --- /dev/null +++ b/firehose-transformation-cdk-typescript/src/.gitignore @@ -0,0 +1,8 @@ +*.js +!jest.config.js +*.d.ts +node_modules + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/firehose-transformation-cdk-typescript/src/.npmignore b/firehose-transformation-cdk-typescript/src/.npmignore new file mode 100644 index 000000000..c1d6d45dc --- /dev/null +++ b/firehose-transformation-cdk-typescript/src/.npmignore @@ -0,0 +1,6 @@ +*.ts +!*.d.ts + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/firehose-transformation-cdk-typescript/src/README.md b/firehose-transformation-cdk-typescript/src/README.md new file mode 100644 index 000000000..320efc02a --- /dev/null +++ b/firehose-transformation-cdk-typescript/src/README.md @@ -0,0 +1,14 @@ +# Welcome to your CDK TypeScript project + +This is a blank project for CDK development with TypeScript. + +The `cdk.json` file tells the CDK Toolkit how to execute your app. + +## Useful commands + +* `npm run build` compile typescript to js +* `npm run watch` watch for changes and compile +* `npm run test` perform the jest unit tests +* `cdk deploy` deploy this stack to your default AWS account/region +* `cdk diff` compare deployed stack with current state +* `cdk synth` emits the synthesized CloudFormation template diff --git a/firehose-transformation-cdk-typescript/src/bin/firehose-lambda-transform.ts b/firehose-transformation-cdk-typescript/src/bin/firehose-lambda-transform.ts new file mode 100644 index 000000000..bfc945a68 --- /dev/null +++ b/firehose-transformation-cdk-typescript/src/bin/firehose-lambda-transform.ts @@ -0,0 +1,9 @@ +#!/usr/bin/env node +import 'source-map-support/register'; +import * as cdk from 'aws-cdk-lib'; +import { FirehoseLambdaStack } from '../lib/firehose-lambda-stack'; + +const app = new cdk.App(); +new FirehoseLambdaStack(app, 'FirehoseLambdaStack', { + +}); \ No newline at end of file diff --git a/firehose-transformation-cdk-typescript/src/cdk.json b/firehose-transformation-cdk-typescript/src/cdk.json new file mode 100644 index 000000000..2cb679c76 --- /dev/null +++ b/firehose-transformation-cdk-typescript/src/cdk.json @@ -0,0 +1,63 @@ +{ + "app": "npx ts-node --prefer-ts-exts bin/firehose-lambda-transform.ts", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "**/*.d.ts", + "**/*.js", + "tsconfig.json", + "package*.json", + "yarn.lock", + "node_modules", + "test" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-iam:standardizedServicePrincipals": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true, + "@aws-cdk/aws-kms:aliasNameRef": true, + "@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true, + "@aws-cdk/core:includePrefixInUniqueNameGeneration": true, + "@aws-cdk/aws-efs:denyAnonymousAccess": true, + "@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true, + "@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true, + "@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true, + "@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true, + "@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true, + "@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true + } +} diff --git a/firehose-transformation-cdk-typescript/src/jest.config.js b/firehose-transformation-cdk-typescript/src/jest.config.js new file mode 100644 index 000000000..08263b895 --- /dev/null +++ b/firehose-transformation-cdk-typescript/src/jest.config.js @@ -0,0 +1,8 @@ +module.exports = { + testEnvironment: 'node', + roots: ['/test'], + testMatch: ['**/*.test.ts'], + transform: { + '^.+\\.tsx?$': 'ts-jest' + } +}; diff --git a/firehose-transformation-cdk-typescript/src/lambda/index.ts b/firehose-transformation-cdk-typescript/src/lambda/index.ts new file mode 100644 index 000000000..bc30019ae --- /dev/null +++ b/firehose-transformation-cdk-typescript/src/lambda/index.ts @@ -0,0 +1,31 @@ +import { + Context, + FirehoseTransformationResult, + FirehoseTransformationResultRecord +} from 'aws-lambda'; +import { FirehoseTransformationEvent } from 'aws-lambda/trigger/kinesis-firehose-transformation' + +export const handler = async (event: FirehoseTransformationEvent, context: Context): Promise => { + + console.log(`Event: ${JSON.stringify(event, null, 2)}`); + console.log(`Context: ${JSON.stringify(context, null, 2)}`); + const records: FirehoseTransformationResultRecord[] = [] + + for (const record of event.records) { + /* This transformation is the "identity" transformation, the data is left intact */ + records.push({ + recordId: record.recordId, + result: 'Ok', + data: record.data, + }); + } + + + console.log(`Processing completed. Successful records ${records.length}.`); + console.log(`Results: ${JSON.stringify(records)}`) + + return { + records: records + }; + +}; diff --git a/firehose-transformation-cdk-typescript/src/lib/firehose-lambda-stack.ts b/firehose-transformation-cdk-typescript/src/lib/firehose-lambda-stack.ts new file mode 100644 index 000000000..747f632e1 --- /dev/null +++ b/firehose-transformation-cdk-typescript/src/lib/firehose-lambda-stack.ts @@ -0,0 +1,115 @@ +import * as cdk from 'aws-cdk-lib'; +import { Construct } from 'constructs'; +import { + aws_iam as iam, + aws_s3 as s3, + aws_kinesisfirehose as firehose, + aws_lambda_nodejs as lambda, + aws_lambda as lambda_, +} from 'aws-cdk-lib'; + + +export class FirehoseLambdaStack extends cdk.Stack { + constructor(scope: Construct, id: string, props?: cdk.StackProps) { + super(scope, id, props); + + const lambda_role = new iam.Role(this, 'firehose-lambda-role', { + assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com') + }); + + lambda_role.addManagedPolicy( + iam.ManagedPolicy.fromAwsManagedPolicyName( + "service-role/AWSLambdaBasicExecutionRole" + ) + ); + + lambda_role.addToPolicy( + new iam.PolicyStatement({ + resources: ['*'], + actions: [ + "firehose:DescribeDeliveryStream", + "firehose:PutRecord", + "firehose:StartDeliveryStreamEncryption", + "firehose:PutRecordBatch", + "firehose:ListDeliveryStreams" + ], + effect: iam.Effect.ALLOW + }) + ); + + const lambdaFn = new lambda.NodejsFunction(this, 'firehose-lambda-function', { + entry: 'lambda/index.ts', + handler: 'handler', + runtime: lambda_.Runtime.NODEJS_18_X, + architecture: lambda_.Architecture.X86_64, + role: lambda_role, + timeout: cdk.Duration.seconds(60) + }); + + + const bucket = new s3.Bucket(this, 'firehost-destination-bucket', { + encryption: s3.BucketEncryption.S3_MANAGED, + blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL + }); + + const firehose_role = new iam.Role(this, 'firehose-role', { + assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com') + }); + + firehose_role.addToPolicy( + new iam.PolicyStatement({ + resources: [bucket.bucketArn, bucket.bucketArn + '/*'], + actions: ['s3:AbortMultipartUpload', 's3:GetBucketLocation', 's3:GetObject', 's3:ListBucket', 's3:ListBucketMultipartUploads', 's3:PutObject'], + effect: iam.Effect.ALLOW + }) + ); + + firehose_role.addToPolicy( + new iam.PolicyStatement({ + resources: [lambdaFn.functionArn], + actions: ['lambda:InvokeFunction', 'lambda:GetFunctionConfiguration'], + effect: iam.Effect.ALLOW + }) + ); + firehose_role.addToPolicy( + new iam.PolicyStatement({ + resources: ['*'], + actions: ['logs:*'], + effect: iam.Effect.ALLOW + }) + ); + + const firehose_delivery_stream = new firehose.CfnDeliveryStream(this, 'firehose-stream', { + deliveryStreamType: 'DirectPut', + extendedS3DestinationConfiguration: { + bucketArn: bucket.bucketArn, + bufferingHints: { + intervalInSeconds: 60, + sizeInMBs: 1 + }, + roleArn: firehose_role.roleArn, + processingConfiguration: { + enabled: true, + processors: [{ + type: 'Lambda', + parameters: [{ + parameterName: 'LambdaArn', + parameterValue: lambdaFn.functionArn + }] + }] + }, + encryptionConfiguration: { + noEncryptionConfig: 'NoEncryption' + } + } + } + ); + + + new cdk.CfnOutput(this, "S3 Destination Bucket", { + value: bucket.bucketName, + description: "S3 Destination Bucket" + }); + + } +} diff --git a/firehose-transformation-cdk-typescript/src/package.json b/firehose-transformation-cdk-typescript/src/package.json new file mode 100644 index 000000000..9b34d62fa --- /dev/null +++ b/firehose-transformation-cdk-typescript/src/package.json @@ -0,0 +1,28 @@ +{ + "name": "firehose-lambda-transform", + "version": "0.1.0", + "bin": { + "cdk": "bin/firehose-lambda-transform.ts" + }, + "scripts": { + "build": "tsc", + "watch": "tsc -w", + "test": "jest", + "cdk": "cdk" + }, + "devDependencies": { + "@types/jest": "^29.5.5", + "@types/node": "20.7.1", + "aws-cdk": "2.102.0", + "jest": "^29.7.0", + "ts-jest": "^29.1.1", + "ts-node": "^10.9.1", + "typescript": "~5.2.2" + }, + "dependencies": { + "@types/aws-lambda": "^8.10.130", + "aws-cdk-lib": "2.102.0", + "constructs": "^10.0.0", + "source-map-support": "^0.5.21" + } +} diff --git a/firehose-transformation-cdk-typescript/src/tsconfig.json b/firehose-transformation-cdk-typescript/src/tsconfig.json new file mode 100644 index 000000000..aaa7dc510 --- /dev/null +++ b/firehose-transformation-cdk-typescript/src/tsconfig.json @@ -0,0 +1,31 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "lib": [ + "es2020", + "dom" + ], + "declaration": true, + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "noImplicitThis": true, + "alwaysStrict": true, + "noUnusedLocals": false, + "noUnusedParameters": false, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": false, + "inlineSourceMap": true, + "inlineSources": true, + "experimentalDecorators": true, + "strictPropertyInitialization": false, + "typeRoots": [ + "./node_modules/@types" + ] + }, + "exclude": [ + "node_modules", + "cdk.out" + ] +} From 293a5320026d1c0d7904419670c37b336dbbfd77 Mon Sep 17 00:00:00 2001 From: Edward Schaefer Date: Fri, 22 Dec 2023 11:23:04 -0500 Subject: [PATCH 2/6] example-pattern.json corrections --- firehose-transformation-cdk-typescript/example-pattern.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/firehose-transformation-cdk-typescript/example-pattern.json b/firehose-transformation-cdk-typescript/example-pattern.json index c13da76d4..1cd832eca 100644 --- a/firehose-transformation-cdk-typescript/example-pattern.json +++ b/firehose-transformation-cdk-typescript/example-pattern.json @@ -1,7 +1,7 @@ { "title": "Kinesis Firehose Data Transformation with Lambda", "description": "Transform incoming source data and deliver the transformed data to destinations.", - "language": "typescript", + "language": "TypeScript", "level": "200", "framework": "CDK", "introBox": { @@ -16,7 +16,7 @@ "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/firehose-transformation-cdk-typescript", "templateURL": "serverless-patterns/firehose-transformation-cdk-typescript", "projectFolder": "firehose-transformation-cdk-typescript", - "templateFile": "firehose-transformation-cdk-typescript/src/lib/firehost-lambda-transform.ts" + "templateFile": "firehose-transformation-cdk-typescript/src/lib/firehost-lambda-transform" } }, "resources": { From b3d34fd4a13b1d5b15729205ef55a1b049c8b4d9 Mon Sep 17 00:00:00 2001 From: Edward Schaefer Date: Fri, 22 Dec 2023 11:27:18 -0500 Subject: [PATCH 3/6] example-pattern.json template file correction --- firehose-transformation-cdk-typescript/example-pattern.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firehose-transformation-cdk-typescript/example-pattern.json b/firehose-transformation-cdk-typescript/example-pattern.json index 1cd832eca..e3522fb0c 100644 --- a/firehose-transformation-cdk-typescript/example-pattern.json +++ b/firehose-transformation-cdk-typescript/example-pattern.json @@ -16,7 +16,7 @@ "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/firehose-transformation-cdk-typescript", "templateURL": "serverless-patterns/firehose-transformation-cdk-typescript", "projectFolder": "firehose-transformation-cdk-typescript", - "templateFile": "firehose-transformation-cdk-typescript/src/lib/firehost-lambda-transform" + "templateFile": "src/lib/firehost-lambda-transform" } }, "resources": { From 43c126b6812e04ebea4be85afb29b7044b608f61 Mon Sep 17 00:00:00 2001 From: Edward Schaefer Date: Wed, 10 Jan 2024 11:08:46 -0500 Subject: [PATCH 4/6] resolving PR feedback; removing jest, add CW logs, update README, update transformation --- .../README.md | 20 ++++---- .../example-pattern.json | 2 +- .../src/README.md | 14 ------ .../src/jest.config.js | 8 --- .../src/lambda/index.ts | 17 ++++++- .../src/lib/firehose-lambda-stack.ts | 49 +++++++++++++++++-- .../src/package.json | 3 -- 7 files changed, 71 insertions(+), 42 deletions(-) delete mode 100644 firehose-transformation-cdk-typescript/src/README.md delete mode 100644 firehose-transformation-cdk-typescript/src/jest.config.js diff --git a/firehose-transformation-cdk-typescript/README.md b/firehose-transformation-cdk-typescript/README.md index 65b675c3b..da4a6670e 100644 --- a/firehose-transformation-cdk-typescript/README.md +++ b/firehose-transformation-cdk-typescript/README.md @@ -1,8 +1,10 @@ # Amazon Kinesis Data Firehose Data Transformation with AWS Lambda -This pattern deploys a Kinesis Data Firehose Delivery Stream that invokes a Lambda function to transform incoming source data and delivers the transformed data to a destination Amazon S3 bucket. +This pattern demonstrates how to transform streaming data received by Amazon Kinesis Data Firehose using AWS Lambda before delivering the transformed data to Amazon S3. -Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/firehose-transformation-cdk-typescript](https://serverlessland.com/patterns/firehose-transformation-cdk-typescript) +The pattern uses the AWS Cloud Development Kit (AWS CDK) to deploy a Kinesis Data Firehose delivery stream, a Lambda function to transform source data, and an Amazon S3 bucket to receive the transformed data. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/firehose-transformation-cdk-typescript Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. @@ -16,7 +18,7 @@ Important: this application uses various AWS services and there are costs associ ## Deployment Instructions 1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: - ``` + ``` git clone https://github.com/aws-samples/serverless-patterns ``` 1. Change directory to the pattern directory: @@ -24,7 +26,7 @@ Important: this application uses various AWS services and there are costs associ cd serverless-patterns/firehose-transformation-cdk-typescript/src ``` 1. Install dependencies: - ```bash + ``` npm install ``` 1. Bootstrap environment (if you have not done so already) @@ -44,7 +46,7 @@ Kinesis Data Firehose can invoke a Lambda function to transform incoming source 1. Open the Kinesis Data Firehose console at https://console.aws.amazon.com/firehose/ -2. Choose the {stack-name}-firehoststream-{stream-id} delivery stream +2. Choose the {stack-name}-firehosestream-{stream-id} delivery stream 3. Under **Test with demo data**, choose **Start sending demo data** to generate sample stock ticker data. @@ -52,11 +54,11 @@ Kinesis Data Firehose can invoke a Lambda function to transform incoming source 5. Verify that test events are being sent to the destination S3 bucket. Note that it might take a few minutes for new objects to appear in the bucket, based on the buffering configuration. -``` -aws s3 ls s3://{destination_bucket_name} --recursive --human-readable --summarize -``` + ``` + aws s3 ls s3://{destination_bucket_name} --recursive --human-readable --summarize + ``` -Or nagivate to the S3 console and manually verify that the demo data has been sent to S3 + Or nagivate to the S3 console and manually verify that the demo data has been sent to S3 ## Cleanup diff --git a/firehose-transformation-cdk-typescript/example-pattern.json b/firehose-transformation-cdk-typescript/example-pattern.json index e3522fb0c..875b55592 100644 --- a/firehose-transformation-cdk-typescript/example-pattern.json +++ b/firehose-transformation-cdk-typescript/example-pattern.json @@ -1,6 +1,6 @@ { "title": "Kinesis Firehose Data Transformation with Lambda", - "description": "Transform incoming source data and deliver the transformed data to destinations.", + "description": "Transform incoming source data and deliver the transformed data to S3.", "language": "TypeScript", "level": "200", "framework": "CDK", diff --git a/firehose-transformation-cdk-typescript/src/README.md b/firehose-transformation-cdk-typescript/src/README.md deleted file mode 100644 index 320efc02a..000000000 --- a/firehose-transformation-cdk-typescript/src/README.md +++ /dev/null @@ -1,14 +0,0 @@ -# Welcome to your CDK TypeScript project - -This is a blank project for CDK development with TypeScript. - -The `cdk.json` file tells the CDK Toolkit how to execute your app. - -## Useful commands - -* `npm run build` compile typescript to js -* `npm run watch` watch for changes and compile -* `npm run test` perform the jest unit tests -* `cdk deploy` deploy this stack to your default AWS account/region -* `cdk diff` compare deployed stack with current state -* `cdk synth` emits the synthesized CloudFormation template diff --git a/firehose-transformation-cdk-typescript/src/jest.config.js b/firehose-transformation-cdk-typescript/src/jest.config.js deleted file mode 100644 index 08263b895..000000000 --- a/firehose-transformation-cdk-typescript/src/jest.config.js +++ /dev/null @@ -1,8 +0,0 @@ -module.exports = { - testEnvironment: 'node', - roots: ['/test'], - testMatch: ['**/*.test.ts'], - transform: { - '^.+\\.tsx?$': 'ts-jest' - } -}; diff --git a/firehose-transformation-cdk-typescript/src/lambda/index.ts b/firehose-transformation-cdk-typescript/src/lambda/index.ts index bc30019ae..4e926fb01 100644 --- a/firehose-transformation-cdk-typescript/src/lambda/index.ts +++ b/firehose-transformation-cdk-typescript/src/lambda/index.ts @@ -4,6 +4,7 @@ import { FirehoseTransformationResultRecord } from 'aws-lambda'; import { FirehoseTransformationEvent } from 'aws-lambda/trigger/kinesis-firehose-transformation' +import { Buffer } from 'buffer'; export const handler = async (event: FirehoseTransformationEvent, context: Context): Promise => { @@ -12,11 +13,23 @@ export const handler = async (event: FirehoseTransformationEvent, context: Conte const records: FirehoseTransformationResultRecord[] = [] for (const record of event.records) { - /* This transformation is the "identity" transformation, the data is left intact */ + const recordData = JSON.parse(Buffer.from(record.data, 'base64').toString('utf8')); + + /** do some record transformation here */ + + /** + * This example assumes source data is similar to Kinesis Data Firehose console demo data + * simulated stock ticker data: https://docs.aws.amazon.com/firehose/latest/dev/test-drive-firehose.html + * */ + + const oldPrice = recordData["PRICE"] - recordData["CHANGE"]; + recordData["CUSTOM_RECORDID"] = record.recordId + recordData["CUSTOM_OLDPRICE"] = oldPrice.toFixed(2) + records.push({ recordId: record.recordId, result: 'Ok', - data: record.data, + data: Buffer.from(JSON.stringify(recordData), 'utf-8').toString('base64') }); } diff --git a/firehose-transformation-cdk-typescript/src/lib/firehose-lambda-stack.ts b/firehose-transformation-cdk-typescript/src/lib/firehose-lambda-stack.ts index 747f632e1..88479fa9a 100644 --- a/firehose-transformation-cdk-typescript/src/lib/firehose-lambda-stack.ts +++ b/firehose-transformation-cdk-typescript/src/lib/firehose-lambda-stack.ts @@ -3,6 +3,7 @@ import { Construct } from 'constructs'; import { aws_iam as iam, aws_s3 as s3, + aws_logs as logs, aws_kinesisfirehose as firehose, aws_lambda_nodejs as lambda, aws_lambda as lambda_, @@ -12,7 +13,7 @@ import { export class FirehoseLambdaStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); - + const lambda_role = new iam.Role(this, 'firehose-lambda-role', { assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com') }); @@ -52,13 +53,30 @@ export class FirehoseLambdaStack extends cdk.Stack { blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL }); + const fhLogGroup = new logs.LogGroup(this, 'firehose-log-group', { + retention: logs.RetentionDays.ONE_WEEK + }); + + const fhLogStreamS3 = new logs.LogStream(this, 'firehose-log-stream-s3', { + logGroup: fhLogGroup, + logStreamName: 'S3Delivery' + }); + + const fhLogStreamS3Backup = new logs.LogStream(this, 'firehose-log-stream-s3-backup', { + logGroup: fhLogGroup, + logStreamName: 'BackupDelivery' + }); + const firehose_role = new iam.Role(this, 'firehose-role', { assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com') }); firehose_role.addToPolicy( new iam.PolicyStatement({ - resources: [bucket.bucketArn, bucket.bucketArn + '/*'], + resources: [ + bucket.bucketArn, + `${bucket.bucketArn}/*` + ], actions: ['s3:AbortMultipartUpload', 's3:GetBucketLocation', 's3:GetObject', 's3:ListBucket', 's3:ListBucketMultipartUploads', 's3:PutObject'], effect: iam.Effect.ALLOW }) @@ -71,10 +89,14 @@ export class FirehoseLambdaStack extends cdk.Stack { effect: iam.Effect.ALLOW }) ); + firehose_role.addToPolicy( new iam.PolicyStatement({ - resources: ['*'], - actions: ['logs:*'], + resources: [ + `${fhLogGroup.logGroupArn}:log-stream:${fhLogStreamS3.logStreamName}`, + `${fhLogGroup.logGroupArn}:log-stream:${fhLogStreamS3Backup.logStreamName}` + ], + actions: ['logs:CreateLogStream', 'logs:PutLogEvents'], effect: iam.Effect.ALLOW }) ); @@ -83,10 +105,16 @@ export class FirehoseLambdaStack extends cdk.Stack { deliveryStreamType: 'DirectPut', extendedS3DestinationConfiguration: { bucketArn: bucket.bucketArn, + prefix: 'transformed-data/', bufferingHints: { intervalInSeconds: 60, sizeInMBs: 1 }, + cloudWatchLoggingOptions: { + enabled: true, + logGroupName: fhLogGroup.logGroupName, + logStreamName: fhLogStreamS3.logStreamName + }, roleArn: firehose_role.roleArn, processingConfiguration: { enabled: true, @@ -100,7 +128,18 @@ export class FirehoseLambdaStack extends cdk.Stack { }, encryptionConfiguration: { noEncryptionConfig: 'NoEncryption' - } + }, + s3BackupMode: 'Enabled', + s3BackupConfiguration: { + bucketArn: bucket.bucketArn, + prefix: 'original-source-data/', + roleArn: firehose_role.roleArn, + cloudWatchLoggingOptions: { + enabled: true, + logGroupName: fhLogGroup.logGroupName, + logStreamName: fhLogStreamS3Backup.logStreamName + } + }, } } ); diff --git a/firehose-transformation-cdk-typescript/src/package.json b/firehose-transformation-cdk-typescript/src/package.json index 9b34d62fa..8e03f56d0 100644 --- a/firehose-transformation-cdk-typescript/src/package.json +++ b/firehose-transformation-cdk-typescript/src/package.json @@ -11,11 +11,8 @@ "cdk": "cdk" }, "devDependencies": { - "@types/jest": "^29.5.5", "@types/node": "20.7.1", "aws-cdk": "2.102.0", - "jest": "^29.7.0", - "ts-jest": "^29.1.1", "ts-node": "^10.9.1", "typescript": "~5.2.2" }, From 468b8056f6c6eda8dea2d2670dd62b65cac295bd Mon Sep 17 00:00:00 2001 From: Ben <9841563+bfreiberg@users.noreply.github.com> Date: Mon, 15 Jan 2024 08:49:51 +0100 Subject: [PATCH 5/6] Add final pattern file --- ...irehose-transformation-cdk-typescript.json | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 firehose-transformation-cdk-typescript/firehose-transformation-cdk-typescript.json diff --git a/firehose-transformation-cdk-typescript/firehose-transformation-cdk-typescript.json b/firehose-transformation-cdk-typescript/firehose-transformation-cdk-typescript.json new file mode 100644 index 000000000..cc061aece --- /dev/null +++ b/firehose-transformation-cdk-typescript/firehose-transformation-cdk-typescript.json @@ -0,0 +1,95 @@ +{ + "title": "Kinesis Firehose Data Transformation with Lambda", + "description": "Transform incoming source data and deliver the transformed data to S3.", + "language": "TypeScript", + "level": "200", + "framework": "CDK", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates how to transform streaming data received by Kinesis Data Firehose using AWS Lambda before delivering the transformed data to Amazon S3.", + "To transform incoming source data, Kinesis Data Firehose invokes the specified Lambda function asynchronously with each buffered batch using the AWS Lambda synchronous invocation mode. The transformed data is sent from Lambda to Kinesis Data Firehose. Kinesis Data Firehose then sends it to the destination S3 bucket when the specified destination buffering size or buffering interval is reached, whichever happens first." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/firehose-transformation-cdk-typescript", + "templateURL": "serverless-patterns/firehose-transformation-cdk-typescript", + "projectFolder": "firehose-transformation-cdk-typescript", + "templateFile": "src/lib/firehose-lambda-stack.ts" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon Kinesis Data Firehose Data Transformation", + "link": "https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html" + }, + { + "text": "Using AWS Lambda with Amazon Kinesis Data Firehose", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html" + }, + { + "text": "Ingesting enriched IoT data into Amazon S3 using Amazon Kinesis Data Firehose", + "link": "https://aws.amazon.com/blogs/iot/ingesting-enriched-iot-data-into-amazon-s3-using-amazon-kinesis-data-firehose/" + }, + { + "text": "Capture clickstream data using AWS serverless services", + "link": "https://aws.amazon.com/blogs/industries/capture-clickstream-data-using-aws-serverless-services/" + } + ] + }, + "deploy": { + "text": [ + "cdk deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "cdk delete" + ] + }, + "authors": [ + { + "name": "Edward Schaefer", + "image": "https://d2siip5gg18ho0.cloudfront.net/images/schaeedw-photo-centered_250x250.jpg", + "bio": "Solutions Architect @ Amazon Web Services", + "linkedin": "ejschaefer" + } + ], + "patternArch": { + "icon1": { + "x": 20, + "y": 50, + "service": "kinesis-firehose", + "label": "Amazon Kinesis Firehose" + }, + "icon2": { + "x": 50, + "y": 50, + "service": "lambda", + "label": "AWS Lambda" + }, + "icon3": { + "x": 80, + "y": 50, + "service": "s3", + "label": "S3" + }, + "line1": { + "from": "icon1", + "to": "icon2", + "label": "" + }, + "line2": { + "from": "icon2", + "to": "icon3", + "label": "" + } + } +} From 0780df39037e8570e850026fb0d933bbfb6c382e Mon Sep 17 00:00:00 2001 From: Ben <9841563+bfreiberg@users.noreply.github.com> Date: Mon, 15 Jan 2024 08:51:47 +0100 Subject: [PATCH 6/6] Fix typo in cleanup instructions --- .../firehose-transformation-cdk-typescript.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firehose-transformation-cdk-typescript/firehose-transformation-cdk-typescript.json b/firehose-transformation-cdk-typescript/firehose-transformation-cdk-typescript.json index cc061aece..ce2f8d9ff 100644 --- a/firehose-transformation-cdk-typescript/firehose-transformation-cdk-typescript.json +++ b/firehose-transformation-cdk-typescript/firehose-transformation-cdk-typescript.json @@ -51,7 +51,7 @@ }, "cleanup": { "text": [ - "cdk delete" + "cdk delete" ] }, "authors": [