SQL Server administration and T-SQL development, Web Programming with ASP.NET, HTML5 and Javascript, Windows Phone 8 app development, SAP Smartforms and ABAP Programming, Windows 7, Visual Studio and MS Office software
Development resources, articles, tutorials, code samples, tools and downloads for AWS Amazon Web Services, Redshift, AWS Lambda Functions, S3 Buckets, VPC, EC2, IAM

Amazon Firehose Kinesis Streaming Data Visualization with Kibana and ElasticSearch


In this tutorial, I want to show cloud developers to create an Amazon Kinesis Firehose delivery stream and test with demo streaming data which is sent to Amazon Elasticsearch service for visualization with Kibana. Additionally incoming streaming data into Amazon Kinesis Firehose is modified using a transformation function managed by a serverless AWS Lambda function. Kinesis Firehose keeps creates the backup of streaming data during transformation in Amazon S3 buckets.

In short, in this AWS Amazon Web Services tutorial, cloud professionals will use a number of services like Amazon Kinesis Firehose, AWS Lambda functions, Amazon Elasticsearch, Amazon S3, AWS IAM Identity and Access Management service, Kibana as visualization and reporting tool and finally Amazon CloudWatch service for monitoring.


IAM Roles

In this tutorial, we are required to create two IAM roles.
IAM stands for Amazon Identity and Access Management. Assigning IAM roles to services, cloud users grant permission to those services to access other AWS resources on behalf of you.

First IAM role will be for AWS Lambda function which reads streaming data from Amazon Firehose, transforms the data and returns back to the Amazon Firehose delivery service. Assigning this IAM role to serverless Lambda function, we will enable that serverless function to access to Firehose delivery stream.

The second IAM role is to be created for Amazon Firehose Service to use required AWS services like accessing to S3 buckets, etc. We will create this IAM role in following sections while creating the Amazon Firehose Delivery service. This Identity and Access Management role will enable Kinesis Firehose delivery stream to access to Amazon S3 buckets to stored processed files including incoming stream messages.

Create IAM Role for AWS Lambda Function

create new role on Amazon IAM dashboard

Choose Lambda as the AWS service that will use this new IAM role. EC2 and Lambda services are most frequently used services because of that they are standing in the head of the list.

create role for AWS Lambda service on IAM dashboard

Click Next: Permissions button to continue with next step.

You will not want to grant administrator role to any service in production environments.
On the other hand, to continue faster with the tutorial and since I will use this role temporarily for the period of this demo, I grant AdministratorAccess to this new Identity and Access Management IAM role.
Basically, serverless function requires write access to Amazon S3 buckets.

attach permissions policies to new IAM role

One last step in creating Identity and Access Management roles is assigning tags (key-value) pairs for the role. In future, you can use tags for filtering resources amoung all other cloud resources created in your account. This will simplify your tasks later.

define tags for your AWS IAM role

Review the role settings you have configured in previous steps and type a role name and provide a short description for your new IAM role. Then press Create Role button.

review and name your new AWS IAM role


Create Serverless Lambda Function for Transformation of Stream Data

As I noted before, sample or demo data created for Amazon Kinesis service to simulate incoming streaming data don't contain timestamp information which indicates the date and time of the event occured point of time. Therefore, to add a new field to streaming data and get rid of useless information, we will create a new serverless AWS Lambda function using Python. Then we will enable Amazon Kinesis Firehose delivery stream to call this Lambda function to convert or transform incoming streaming messages to process successfully.

First, let's create a new AWS Lambda function from scratch and name it as "Transformation-Function" by choosing Python 2.7 as runtime. Choose "Transformation-Function-Role" as the execution role of our new Lambda function among existing roles. If the IAM role you have created is not listed in existing roles dropdown list, press the refresh button on the right of the list to update the roles list.

create AWS serverless Lambda function for Amazon Kinesis Firehose

Press "Create Function" button to continue with code creation after the basic settings of AWS Lambda function configuration.

Change the Lambda function Python code as follows replacing all previous sample code:

import base64
import json
from datetime import datetime

# Incoming Event
def lambda_handler(event, context):
output = []
now = datetime.utcnow().isoformat()

# Loop through records in incoming Event
for record in event['records']:

 # Extract message
 message = json.loads(base64.b64decode(record['data']))

 # Construct output
 data_field = {
  'timestamp': now,
  'ticker_symbol': message['ticker_symbol'],
  'price': message['price']
 }
 output_record = {
  'recordId': record['recordId'],
  'result': 'Ok',
  'data': base64.b64encode(json.dumps(data_field))
 }
 output.append(output_record)

return {'records': output}
AWS Lambda function Python code

Below Python code actually adds a timestamp field required for reporting and removes unused data from streaming message by executing following tasks:

AWS conversion Lambda function code in Python

Lambda code receives an event via lambda_handler method. This event contains multiple streaming data records.
So by executing a loop through each record using "For ... in ..." syntax, the code processes each record in the stream message.
Within code we create a new output record by adding a new field timestamp (using "now" variable), and existing fields; ticker symbol and price from the stream message.
The demo data created as input for Amazon Kinesis does not include a timestamp field. For visualizing streaming data with Kibana we will require a datetime field. We add this timestamp data using AWS transformation Lambda function so that we can display the change of stock price with time.

For the special case of this AWS serverless Lambda function because it will be integrated with Amazon Kinesis Firehose solution, increase the Timeout value more than 1 minute. In general, according to the characteristics of each AWS Lambda function the timeout and memory limits can be adjusted using Basic Settings any time.

AWS Lambda function memory and timeout settings

Click Save button to complete creation of AWS Lambda function which we will use to convert stream data content in following steps in this tutorial.


Create Amazon ElasticSearch Service Domain

On AWS Management Console launch Amazon Elasticsearch service dashboard to create the Elasticsearch domain we need.

Amazon ElasticSearch Service dashboard

First of all we need a domain. So press "Create a new domain" button.

In "Choose deployment type" step, select "Development and Testing" option for our domain. There is no need for this sample to build a highly available model.

Amazon Elasticsearch deployment type

For the Elasticsearch version, please choose version 6.3
Other versions for example, 7.1 is not supported for Amazon Kinesis Firehose. In later steps, you may get an error message related with wrong version selection.

Amazon Elasticsearch version

At "Configure Cluster" step, type a descriptive short name for the Elasticsearch domain you are creating with this task.

Amazon Elasticsearch domain name

Since we are creating our Amazon ES domain for the sake of this AWS tutorial, we should better select a small instance type to be in free tier or at least to keep the expense minimized.
So you can select "t2.small.elasticsearch" for "Instance type" value.

Amazon Elasticsearch instance type

Number of instances will be automatically set to 1.
Do not enable "Dedicated master instances" by keeping default unselected option.

In Storage section, keep default values; EBS for Storage type, General Purpose SSD for EBS Volume type, and 10 GB for EBS storage size per node.

Amazon Elasticsearch storage type

Continue to Next screen.

In Network configuration section, choose options "Public access" instead of recommended "VPC access" to make configuration easier.

public access network configuration for Amazon Elasticsearch domain

In "Access Policy" section, choose "Allow open access to the domain" option from the combo box list.

access policy for Elasticsearch domain

{
 "Version": "2012-10-17",
 "Statement": [
  {
   "Effect": "Allow",
   "Principal": {
   "AWS": [
    "*"
   ]
  },
  "Action": [
   "es:*"
  ],
  "Resource": "arn:aws:es:eu-west-3:123456789010:domain/kodyaz-elasticsearch-domain/*"
  }
 ]
}
IAM Access Policy

Confirm your selection by marking the "I accept the risk" checkbox.

Click Next button to continue with Review screen and press confirm button to complete Elasticsearch domain creation.

If you wait a few minutes, you will see the message "You have successfully created an Elasticsearch domain." if everything goes as expected. The domain status may be in "Loading" state and it takes up to 10 minutes for the Elasticsearch domain to be initialized. Then the status will be "Active".

On "Overview" tab of our Elasticsearch domain, you will see a URL for "Kibana"

Amazon Elasticsearch domain details in dashboard screen

Click on the Kibana URL which will launch the Kibana address in a new web browser tab.

Kibana visualization tool for Elasticsearch data

We will use this Kibana address to visualize streaming data later in this tutorial.


Create Amazon Kinesis Firehose

On AWS Management Console, go to Amazon Kinesis dashboard and click on Get Started button to create our first stream.

Amazon Kinesis service on AWS

Amazon Kinesis service provides 4 options for developers: "Create Data Stream", "Create Delivery Stream", "Create Analytics Application", "Create Video Stream", to build different types of data streaming solutions on AWS.

Please choose to create delivery stream option to use Amazon Kinesis Firehose which makes streaming easier since it supports built-in delivery to AWS S3, Amazon Redshift, Amazon Elasticsearch and Kinesis Data Analytics services.

Amazon Kinesis Firehose delivery streams

When the create delivery stream wizard is displayed as 5 steps, in first step "Name and source" type a descriptive short name for your stream.

new Amazon Kinesis Firehose delivery stream

Continue to following step "Process records" with Next button.
There is an option that is configurable at this step which enables developers to transform source records using an AWS Lambda function.
Change default selected option of "Record transformation" from Disabled to Enabled and select the AWS Lambda function we have created in an early step in this tutorial.

record transformation using AWS Lambda function for Amazon Kinesis Firehose delivery stream

You will be informed to increase the timeout set for the AWS Lambda function. You can edit the Lambda function and increase the Timeout setting to more than 1 minute if you have done it yet. You can find the Timeout value in Basic Settings section.

Click Next button to continue with next screen "Select Destination".

In this step, choose Amazon Elasticsearch Service which is used especially for log analytics, real-time application monitoring and click stream analytics, etc.

choose Amazon Elasticsearch as destination of Kinesis Firehose streaming data

In destination section, choose the Elasticsearch domain we have created in previous steps.
Type an index name, type name in related input textboxes.

Amazon Elasticsearch service as Kinesis Firehose delivery destination

Choose the S3 bucket, you have created earlier in this tutorial as the backup S3 bucket for Amazon Firehose processing messages.

backup S3 bucket for Kinesis Firehose delivery stream

Click Next to continue with following "Configure settings" steps.
Change buffer size and buffer interval settings for 1 MB and 60 seconds.

Amazon Elasticsearch domain buffer conditions

In IAM role section, we will create the second IAM role I have mentioned before. This role is for Amazon Firehose.

IAM role for Amazon Firehose access to services like AWS S3 or KMS

Press "Create new or choose" button.

A new "IAM Management Console" page will be displayed. In role summary section, click on IAM Role combo box and choose "Create a new IAM Role" option.

Then type a descriptive name for your new IAM role

allow Amazon Kinesis Firehose Read and Write permissions to AWS S3 bucket

You can check "View Policy Document" to see what permissions are required for IAM role created and assigned to Firehose delivery service.

Click on Allow button which will return us to the previous screen.

IAM role assigned to Amazon Kinesis Firehose delivery stream

Click Next button for Review screen.
After you review the configuration you can now press "Create delivery stream" button.

If the Amazon Elasticsearch Domain is not yet created, you will have the error message:
Domain arn:aws:es:eu-west-3:123456789010:domain/kodyaz-elasticsearch-domain is still being created

Otherwise, if the Amazon ES domain has been created and in "Active" status, you can successfully create your Amazon Firehose delivery service too.

Amazon Kinesis Firehose delivery streams

Now, we are ready to test our Amazon Firehose delivery stream with sample data.
Select the stream you have created on above "Firehose delivery streams" list. "Test with demo data" button will be enabled. Click on "Test with demo data" button.

test Kinesis Firehose delivery stream with demo data

Click on "Start sending demo data" button.

A message "Demo data is being sent to your delivery stream" will be displayed.

If you close this screen, sending demo data will be stopped. So keep this page open so we can display incoming streaming data via Kibana visualization tool.

If you go to Amazon S3 bucket by the way, you will see data files created with sample data in S3 bucket we have assigned as backup bucket for Amazon Firehose delivery stream.

demo data in text files on AWS S3 bucket created for Amazon Kinesis Firehose

If you check processing-failed sub folder in your Amazon S3 bucket, if you see some entries, then this indicates an error possibly with AWS Lambda function for conversion. For example, I has following error:

{"attemptsMade":4, "arrivalTimestamp":1568568002637, "errorCode":"Lambda.FunctionError", "errorMessage": "The Lambda function was successfully invoked but it returned an error result.", "attemptEndingTimestamp":1568568077755, "rawData":"...", "subsequenceNumber":0, "lambdaArn": "arn:aws:lambda:eu-west-3:123456789010:function:Transformation-Function:$LATEST"}

If you experience such an error then probably you have chosen Python 3.7 for the AWS Lambda function version instead of Python 2.7
Please edit your serverless Lambda function we have created to transform incoming records data and choose Python 2.7 as Python runtime version.


Check CloudWatch Logs

If you have errors related with your AWS Lambda code, you can open the AWS Lambda console and go to your Lambda function. Then switch to Monitoring tab.
In "CloudWatch Logs Insights" section, you will find a list of AWS Lambda function calls.

Amazon CloudWatch Logs Insights

You can click on LogStream link for one of the requests.

This action will navigate you to CloudWatch screen for related log group of the AWS conversion lambda function we have created in previous steps.
Here below, you can see the code line where the error is caused

Amazon CloudWatch Logs for AWS Lambda function

[ERROR] TypeError: a bytes-like object is required, not 'str'
Traceback (most recent call last):
 File "/var/task/lambda_function.py", line 25, in lambda_handler
  'data': base64.b64encode(json.dumps(data_field))
 File "/var/lang/lib/python3.7/base64.py", line 58, in b64encode
  encoded = binascii.b2a_base64(s, newline=False)

As I noted before, this error related with the Python runtime of the AWS Lambda function. Please use Python 2.7 to resolve this issue for this sample tutorial.


Visualize Stream Data in Kibana

On Kibana URL created for the Elasticsearch domain, click on Discover menu selection.

visualize and explore stream data in Kibana

You will see following information message:
In order to visualize and explore data in Kibana, you'll need to create an index pattern to retrieve data from Elasticsearch.

And additionally a warning message is displayed on the screen:
No default index pattern. You must select or create one to continue.

As a result of missing index pattern, no Elasticsearch data is found to display at Kibana visualization tool.

Click on "Advanced Settings"

Kibana advanced settings

Search for "indexPattern:placeholder" in "Name" column of the settings table.
Edit the value of this advanced configuration setting by Edit linkbutton and replace default "logstash-*" value with the index name you have provide at Amazon Elasticsearch Service destination section during Amazon Firehose delivery stream creation.
In our case: streaming-data-index-01

For timefield value which is the value for "timelion:es.timefield" setting, enter "timestamp"

Now switch to "Visualize" tab from left menu.

In "Define index pattern" step, click on "index pattern" textbox. Choose of copy our sample index name streaming-data-index-01
You should have success message as follows

create index pattern on Kibana

Continue with "Next Step" button.

On "Configure settings" step, at "Time Filter field name" combo box, choose "timestamp" field.

configure settings for Kibana

Then press "Create index pattern" button.

Now from left menu, click Visualize.
Then press "Create a visualization" button

create a new visualization on Kibana

You can choose one of the many visualization types available to display data in different views, for example we can continue with a line chart.

Kibana visualization types among basic charts

Select index which maps to delivery stream we have created previously.

index selection for Kibana data visualization

Now we are ready to select the metrics to display our line chart for the selected index data.

Click on Y-Axis arrow and provide following sample configuration values

metrics selection for visualization on Kibana

For X-Axis under Buckets section,

X-axis selection for Kibana line chart visualization

Click on "Add sub-buckets" button to display average price value for each stock
Choose "Split Series"

split series in line chart for different stocks

Click on Apply Changes button to display streaming data values on line chart on Kibana

Kibana visualization of Amazon Kinesis Firehose streaming data


Monitoring

Launch Amazon Kinesis dashboard and open the Kinesis Firehose Delivery stream we have created.
It is possible that you are still sending demo data to the stream on the same page.
Switch to "Monitoring" tab on our sample stream

There are a number of Delivery stream metrics on the screen

Amazon Kinesis Firehose delivery stream metrics to monitor

On the same page, there are also tabs for Amazon Elasticsearch Service Logs and Amazon S3 Logs
In addition to those, we have created a convertion AWS Lambda function. This Lambda function also generates logs.
Even if you don't know which Lambda function is used for conversion (or even you may not know a conversion function is used or not), it is possible to identify conversion function in Details tab on the same page.

In section "Transform source records with AWS Lambda" , if "Source record transformation" is "Enabled" then you will find the AWS Lambda function name and a short-cut to the serverless function there. There is also a Monitoring tab for AWS Lambda function.



AWS


Copyright © 2004 - 2024 Eralper YILMAZ. All rights reserved.