AWS Lambda Function as Amazon DynamoDB Stream Trigger with Python Code
In this AWS tutorial for developers, I want to show how DynamoDB stream can be used to capture item-level changes in an Amazon DynamoDB table. The captured changes can be new item entries into a DynamoDB table, table item modifications or item deletions. The sample application I share in this AWS tutorial includes a sample Amazon DynamoDB table, DynamoDB streams which triggers an AWS Lambda function for processing the changed data within the DynamoDB table. This AWS Lambda function which is developed in Python includes sample codes which sends an Amazon SNS email notification to subscriber email addresses and writes the event data as a separate Amazon S3 bucket file.
Here is the simplified architectural diagram of the solution described in this Amazon Web Services tutorial for serverless AWS Lambda developers which logs changes in an Amazon DynamoDB table in an Amazon S3 bucket and send email SNS notifications to subscribers of a specific SNS topic.
Here is the steps to follow for implementing the architecture shared above for utilizing Amazon DynamoDB streams to catch and process item level changes on a DynamoDB table.
First of all, create the DynamoDB table.
Create an empty AWS Lambda function with Python.
Enable DynamoDB stream for the target DynamoDB table and create a new trigger which calls recently created Lambda function.
Create an Amazon S3 bucket to store captured data events.
Create an Amazon SNS topic and subscribe to this topic using your email.
I will not go deep with each step. I will leave some steps to the reader to cover to keep the tutorial simple.
Create Amazon DynamoDB Table and Enable DynamoDB Stream
Launch Amazon DynamoDb service on AWS Management Console and create a new table.
As seen below, I just used a string field named username as the partition key of my new sample DynamoDB table.
After the DynamoDB table is created display details and drill-down into Exports and streams to enable DynamoDB stream
As you can see in following screenshot, by default the DynamoDB stream on a new table is disabled.
To enable the DynamoDB stream, just click on Enable button.
This will enable AWS data engineers to capture item level changes on an Amazon DynamoDB table and process them using an AWS Lambda function via a trigger.
To process both previous and succeeding state or attributes and values of an Amazon DynamoDB table item in case an item modification, I choose "New and old images" to push to the DynamoDB stream which we are creating at this moment.
Press Enable stream to complete this task.
Now we can define a trigger
Click on "Create trigger" button.
As seen in below screenshot, the AWS data engineer can select one of the existing Lambda functions here.
If you have not yet created the AWS Lambda function, you can start creating the serverless function by pressing the "Create new" button. But let's skip this step now and continue reading.
The DynamoDB stream trigger will invoke or call this serverless function according to the specification identified here.
The Batch size property is important. If you want to call the Lambda function everytime an DynamoDB table item is affected, then select the batch size as 1.
If you provide bigger values, before calling the Lambda function, the DynamoDB stream will wait until that amount of items are inserted, modified or deleted on the curremt table.
Make sure that you have marked the "Enable trigger" checkbox.
Create Amazon SNS Topic
Let's now create our sample Amazon Simple Notification Service or SNS service topic.
Launch Amazon SNS service on AWS Management Console.
On the left menu, you will see Topics
Then start creating the sample topic by pressing the "Create topic" button.
Enter the topic name, for example, "DynamoDBItemChanges" and choose the Standard type for our Amazon SNS topic. You can keep all other options as default and create the topic.
Now we can subscribe to this SNS topic using our email addresses.
If you open the details page for the Amazon SNS topic which we have created right now, you will see the Subscriptions tab and Create subscription button.
Start creation of a subscription by clicking on the button. On next step, select Email as the "Protocol" and enter your email address for test as the Endpoint
As noted on the screen, after you create your subscription, one more step is requried. You must confirm the email subscription pressing the Confirm subscription link button which is placed in an email that will be sent to your subscription email immediately.
After this step is completed we can continu with Amazon S3 configurations.
Create Amazon S3 Bucket
Now on AWS Management Console, launch the object storage service Amazon S3 and create a bucket.
Create AWS Lambda Function
Let's now create and configure our AWS Lambda function which is developed in Python language with Python 3.8.
I named the Lambda function as "DynamoDBStreamTrigger"
First we can configure the lambda function trigger. Press on the "+ Add trigger" button on the "Function overview" section.
On the Trigger configuration page, select "DynamoDB as the AWS Service and select our DynamoDB table "SampleDynamoDBTable" which we created for this tutorial.
Change the batch size to 1 from its default value 100 on the trigger configuration page.
You can have a look for additional settings but this is enough for this tutorial.
Just don't forget to check that the "Enable trigger" checkbox is marked then click on "Add"
You will get the following error because of missing permissions:
An error occurred when creating the trigger: Cannot access stream arn:aws:dynamodb:eu-central-1:1111111111:table/SampleDynamoDBTable/stream/2021-05-12T12:27:12.779.
Please ensure the role can perform the GetRecords, GetShardIterator, DescribeStream, ListShards, and ListStreams Actions on your stream in IAM.
(Service: AWSLambda; Status Code: 400; Error Code: InvalidParameterValueException; Request ID: 3e705721-3192-4da0-89d4-c59ad2628ee8; Proxy: null)
Since within our AWS Lambda function we will get in contact with various services like Amazon DynamoDB, Amazon S3, Amazon SNS the basic AWS Lambda function execution role will not be enough and cause permission errors.
To prevent serverless function execution errors, we can create an AWS IAM role before and assign the required priviledges ahead and select this IAM role as the execution role of the Lambda function during creation.
Of course, you can start your AWS development with the basic AWS Lambda execution role and add each policy or permission one by one according to the permission exceptions you will experience during the development and tests.
Just as we are going to do now.
On our AWS Lambda function's main screen, you can see a tab named Configuration which has sub-sections like Permissions
Click on the execution role name to navigate to the AWS IAM role definition where we can attach IAM policies and modify the role permissions
On "Permissions" tab, you will see a basic "AWSLambdaBasicExecutionRole-......" policy.
We will now Add inline policy for DynamoDB access requirements. Click on the "Add inline policy" button.
Choose DynamoDB as the service.
Find the actions mentioned in the execution role exception identified as missing priviledges and mark them.
GetRecords, GetShardIterator, DescribeStream, and ListStreams actions among "Read" actions.
If you look at the above inline policy creation template you will see that I also restricted the permissions for only one specific Amazon DynamoDB stream. If you want to restrict the IAM policy to implement theb est practices of granting the least priviledge, you can copy the ARN (Amazon Resource Name) of the target DynamoDB table stream.
Now if we go a step back and try to add the DynamoDB service as a trigger to our Lambda function, you will see it will be successful this time.
You will see a message similar to "The trigger SampleDynamoDBTable was successfully added to function DynamoDBStreamTrigger. The function is now receiving events from the trigger."
If the Lambda function trigger creation is not successful, please review the policy attached or created for the IAM role defined as the execution of the Lambda function.
After the Amazon DynamoDB Stream is added to the AWS Lambda function as a trigger, if you switch to the DynamoDB table properties screen "Exports and streams" tab, you will see the Lambda function name listed under DynamoDB Stream triggers.
We still have one step regarding execution role permissions related with SNS notifications and Amazon S3 bucket access.
Let's create an additional inline policy and allow creation of new files in our target Amazon S3 bucket as follows:
Add inline policy
Select S3 as service
Mark PutObject among Write actions.
Review and save the inline policy to attach to our AWS Lambda execution role
Of course, you can also limit the permission for a specific Amazon S3 bucket using the ARN value of the Amazon S3 bucket which you can find on Properties tab of target S3 bucket's details screen.
For the missing Amazon SNS permission, we require to provide SNS:Publish permission to our AWS Lambda execution role.
Let's create an other inline policy by following below steps:
Add inline policy
Select SNS as the service
Mark Publish among Write actions.
In order to limit the permission for a specific Amazon SNS topic, please copy the Amazon Resource Name aka ARN from the target SNS topic details page
Review and save the inline policy. This will end our tasks related with granting IAM permissions to our serverless function execution role.
Let's now continue with coding for AWS Lambda function in Python
Data engineers and AWS Lambda programmers can simply copy and paste the following Python code block by replacing their function's template code.
Please verify the indents before executing the code. While copy and paste, the indents might be corrupted causing Python compiler to raise exception.
Please note that I use boto3 for accessing both Amazon S3 and Amazon SNS services.
You can see that an SNS object named "client" is created and publish method is used for sending notifications to subscribers. The ARN of the SNS topic is provided as an input argument to the publish method just like the message. Please replace the one on the Python code with your SNS topic's ARN
I directly write the incoming event data into a separate file in target Amazon S3 bucket.
You can see that I create an object named "s3" and based on that I create another object "s3object" representing a file which I create on S3 bucket using the put method.
I also used a prefix "SampleDynamoDBTable/CDC/" while creating the file in Amazon S3.
This is simply building a folder like structure within the target Amazon S3 bucket.
After you create / modify / delete an item in Amazon DynamoDB table, you will get an email notification via the Amazon SNS topic subscription and find a json file containing the event details in the target Amazon S3 bucket.
Since we set the batch size for DynamoDB trigger as 1, for example if you delete 3 items at once from the DynamoDB table, you will receive 3 separate emails for each deleted item.
If you experience problems while executing the AWS Lambda function, you can always refer to Logs under Monitor tab of the Lambda function. You can find the same logs under Amazon CloudWatch service, too.
I hope it is useful for AWS Lambda function programmers and for data engineers who work with Amazon DynamoDB as a tutorial showing the DynamoDB Stream and triggers feature of DynamoDB tables.