SQL Server administration and T-SQL development, Web Programming with ASP.NET, HTML5 and Javascript, Windows Phone 8 app development, SAP Smartforms and ABAP Programming, Windows 7, Visual Studio and MS Office software
Development resources, articles, tutorials, code samples, tools and downloads for AWS Amazon Web Services, Redshift, AWS Lambda Functions, S3 Buckets, VPC, EC2, IAM

AWS Lambda Function as Amazon DynamoDB Stream Trigger with Python Code


In this AWS tutorial for developers, I want to show how DynamoDB stream can be used to capture item-level changes in an Amazon DynamoDB table. The captured changes can be new item entries into a DynamoDB table, table item modifications or item deletions. The sample application I share in this AWS tutorial includes a sample Amazon DynamoDB table, DynamoDB streams which triggers an AWS Lambda function for processing the changed data within the DynamoDB table. This AWS Lambda function which is developed in Python includes sample codes which sends an Amazon SNS email notification to subscriber email addresses and writes the event data as a separate Amazon S3 bucket file.

Here is the simplified architectural diagram of the solution described in this Amazon Web Services tutorial for serverless AWS Lambda developers which logs changes in an Amazon DynamoDB table in an Amazon S3 bucket and send email SNS notifications to subscribers of a specific SNS topic.

Amazon DynamoDB stream triggering AWS Lambda function

Here is the steps to follow for implementing the architecture shared above for utilizing Amazon DynamoDB streams to catch and process item level changes on a DynamoDB table.

First of all, create the DynamoDB table.
Create an empty AWS Lambda function with Python.
Enable DynamoDB stream for the target DynamoDB table and create a new trigger which calls recently created Lambda function.
Create an Amazon S3 bucket to store captured data events.
Create an Amazon SNS topic and subscribe to this topic using your email.

I will not go deep with each step. I will leave some steps to the reader to cover to keep the tutorial simple.


Create Amazon DynamoDB Table and Enable DynamoDB Stream

Launch Amazon DynamoDb service on AWS Management Console and create a new table.
As seen below, I just used a string field named username as the partition key of my new sample DynamoDB table.

sample Amazon DynamoDB table creation

After the DynamoDB table is created display details and drill-down into Exports and streams to enable DynamoDB stream
As you can see in following screenshot, by default the DynamoDB stream on a new table is disabled.
To enable the DynamoDB stream, just click on Enable button.
This will enable AWS data engineers to capture item level changes on an Amazon DynamoDB table and process them using an AWS Lambda function via a trigger.

enable DynamoDB stream to capture item level changes on a table

To process both previous and succeeding state or attributes and values of an Amazon DynamoDB table item in case an item modification, I choose "New and old images" to push to the DynamoDB stream which we are creating at this moment.
Press Enable stream to complete this task.

enable stream for Amazon DynamoDB table

Now we can define a trigger

create trigger to invoke AWS Lambda function on Amazon DynamoDB table

Click on "Create trigger" button.

As seen in below screenshot, the AWS data engineer can select one of the existing Lambda functions here.
If you have not yet created the AWS Lambda function, you can start creating the serverless function by pressing the "Create new" button. But let's skip this step now and continue reading.
The DynamoDB stream trigger will invoke or call this serverless function according to the specification identified here.
The Batch size property is important. If you want to call the Lambda function everytime an DynamoDB table item is affected, then select the batch size as 1.
If you provide bigger values, before calling the Lambda function, the DynamoDB stream will wait until that amount of items are inserted, modified or deleted on the curremt table.
Make sure that you have marked the "Enable trigger" checkbox.

select AWS Lambda function for Amazon DynamoDB stream trigger


Create Amazon SNS Topic

Let's now create our sample Amazon Simple Notification Service or SNS service topic.
Launch Amazon SNS service on AWS Management Console.

On the left menu, you will see Topics
Then start creating the sample topic by pressing the "Create topic" button.

Amazon SNS topics

Enter the topic name, for example, "DynamoDBItemChanges" and choose the Standard type for our Amazon SNS topic. You can keep all other options as default and create the topic.

new Amazon SNS topic creation for DynamoDB stream trigger notifications

Now we can subscribe to this SNS topic using our email addresses.

If you open the details page for the Amazon SNS topic which we have created right now, you will see the Subscriptions tab and Create subscription button.

create subscription for Amazon SNS topic

Start creation of a subscription by clicking on the button. On next step, select Email as the "Protocol" and enter your email address for test as the Endpoint

subscribe to an Amazon SNS topic via email

As noted on the screen, after you create your subscription, one more step is requried. You must confirm the email subscription pressing the Confirm subscription link button which is placed in an email that will be sent to your subscription email immediately.

After this step is completed we can continu with Amazon S3 configurations.


Create Amazon S3 Bucket

Now on AWS Management Console, launch the object storage service Amazon S3 and create a bucket.

create Amazon S3 bucket


Create AWS Lambda Function

Let's now create and configure our AWS Lambda function which is developed in Python language with Python 3.8.
I named the Lambda function as "DynamoDBStreamTrigger"

First we can configure the lambda function trigger. Press on the "+ Add trigger" button on the "Function overview" section.

On the Trigger configuration page, select "DynamoDB as the AWS Service and select our DynamoDB table "SampleDynamoDBTable" which we created for this tutorial.
Change the batch size to 1 from its default value 100 on the trigger configuration page.
You can have a look for additional settings but this is enough for this tutorial.
Just don't forget to check that the "Enable trigger" checkbox is marked then click on "Add"

add DynamoDB trigger to AWS Lambda function

You will get the following error because of missing permissions:
An error occurred when creating the trigger: Cannot access stream arn:aws:dynamodb:eu-central-1:1111111111:table/SampleDynamoDBTable/stream/2021-05-12T12:27:12.779.
Please ensure the role can perform the GetRecords, GetShardIterator, DescribeStream, ListShards, and ListStreams Actions on your stream in IAM.
(Service: AWSLambda; Status Code: 400; Error Code: InvalidParameterValueException; Request ID: 3e705721-3192-4da0-89d4-c59ad2628ee8; Proxy: null)

Since within our AWS Lambda function we will get in contact with various services like Amazon DynamoDB, Amazon S3, Amazon SNS the basic AWS Lambda function execution role will not be enough and cause permission errors.

To prevent serverless function execution errors, we can create an AWS IAM role before and assign the required priviledges ahead and select this IAM role as the execution role of the Lambda function during creation.

Of course, you can start your AWS development with the basic AWS Lambda execution role and add each policy or permission one by one according to the permission exceptions you will experience during the development and tests.
Just as we are going to do now.

On our AWS Lambda function's main screen, you can see a tab named Configuration which has sub-sections like Permissions

AWS Lambda execution role

Click on the execution role name to navigate to the AWS IAM role definition where we can attach IAM policies and modify the role permissions

On "Permissions" tab, you will see a basic "AWSLambdaBasicExecutionRole-......" policy.
We will now Add inline policy for DynamoDB access requirements. Click on the "Add inline policy" button.

Choose DynamoDB as the service.
Find the actions mentioned in the execution role exception identified as missing priviledges and mark them.
GetRecords, GetShardIterator, DescribeStream, and ListStreams actions among "Read" actions.

inline policy for AWS Lambda function IAM execution role

If you look at the above inline policy creation template you will see that I also restricted the permissions for only one specific Amazon DynamoDB stream. If you want to restrict the IAM policy to implement theb est practices of granting the least priviledge, you can copy the ARN (Amazon Resource Name) of the target DynamoDB table stream.

DynamoDB Stream ARN value

Now if we go a step back and try to add the DynamoDB service as a trigger to our Lambda function, you will see it will be successful this time.
You will see a message similar to "The trigger SampleDynamoDBTable was successfully added to function DynamoDBStreamTrigger. The function is now receiving events from the trigger."
If the Lambda function trigger creation is not successful, please review the policy attached or created for the IAM role defined as the execution of the Lambda function.

AWS Lambda function with DynamoDB service added as a trigger

After the Amazon DynamoDB Stream is added to the AWS Lambda function as a trigger, if you switch to the DynamoDB table properties screen "Exports and streams" tab, you will see the Lambda function name listed under DynamoDB Stream triggers.

Amazon DynamoDB Stream triggers

We still have one step regarding execution role permissions related with SNS notifications and Amazon S3 bucket access.

Let's create an additional inline policy and allow creation of new files in our target Amazon S3 bucket as follows:
Add inline policy
Select S3 as service
Mark PutObject among Write actions.
Review and save the inline policy to attach to our AWS Lambda execution role

Of course, you can also limit the permission for a specific Amazon S3 bucket using the ARN value of the Amazon S3 bucket which you can find on Properties tab of target S3 bucket's details screen.

allow Lambda function to call PutObject for specific S3 bucket


For the missing Amazon SNS permission, we require to provide SNS:Publish permission to our AWS Lambda execution role.

Let's create an other inline policy by following below steps:
Add inline policy
Select SNS as the service
Mark Publish among Write actions.
In order to limit the permission for a specific Amazon SNS topic, please copy the Amazon Resource Name aka ARN from the target SNS topic details page
Review and save the inline policy. This will end our tasks related with granting IAM permissions to our serverless function execution role.

Amazon SNS Publish permission for specific topic

Let's now continue with coding for AWS Lambda function in Python

Data engineers and AWS Lambda programmers can simply copy and paste the following Python code block by replacing their function's template code.
Please verify the indents before executing the code. While copy and paste, the indents might be corrupted causing Python compiler to raise exception.

Please note that I use boto3 for accessing both Amazon S3 and Amazon SNS services.

You can see that an SNS object named "client" is created and publish method is used for sending notifications to subscribers. The ARN of the SNS topic is provided as an input argument to the publish method just like the message. Please replace the one on the Python code with your SNS topic's ARN

I directly write the incoming event data into a separate file in target Amazon S3 bucket.
You can see that I create an object named "s3" and based on that I create another object "s3object" representing a file which I create on S3 bucket using the put method.
I also used a prefix "SampleDynamoDBTable/CDC/" while creating the file in Amazon S3.
This is simply building a folder like structure within the target Amazon S3 bucket.

import json
import logging
import boto3

def lambda_handler(event, context):
   
   txt = ""
   myeventID = ""
   m = ""

   for record in event['Records']:
      txt = txt + record['eventName']

   for record in event['Records']:
      myeventID = record['eventID']

   n = str(len(event['Records']))
   k = record['dynamodb']['Keys']['username']['S']
   m = 'Successfully processed %s records. Keys: %s.' % (n, k)
   message = {"message": m}

   client = boto3.client('sns')
   response = client.publish(
      TargetArn = "arn:aws:sns:eu-central-1:111111111111:DynamoDBItemChanges",
      Message = json.dumps({'default': json.dumps(message)}),
      MessageStructure = 'json'
   )

   s3 = boto3.resource('s3')
   s3bucket = 'dynamodbstreamdata'
   fname = 'SampleDynamoDBTable/CDC/' + str(myeventID) + '.json'
   s3object = s3.Object(s3bucket, fname)
   s3object.put(
      Body=(bytes(json.dumps(event).encode('UTF-8')))
   )

   return {
      'statusCode': 200,
      'body': json.dumps('-' + txt + '-')
   }
DynamoDB Stream Trigger AWS Lambda Function Python Code

After you create / modify / delete an item in Amazon DynamoDB table, you will get an email notification via the Amazon SNS topic subscription and find a json file containing the event details in the target Amazon S3 bucket.

Since we set the batch size for DynamoDB trigger as 1, for example if you delete 3 items at once from the DynamoDB table, you will receive 3 separate emails for each deleted item.

If you experience problems while executing the AWS Lambda function, you can always refer to Logs under Monitor tab of the Lambda function. You can find the same logs under Amazon CloudWatch service, too.

Amazon CloudWatch logs for AWS Lambda function

I hope it is useful for AWS Lambda function programmers and for data engineers who work with Amazon DynamoDB as a tutorial showing the DynamoDB Stream and triggers feature of DynamoDB tables.



AWS


Copyright © 2004 - 2024 Eralper YILMAZ. All rights reserved.