Saturday, April 20, 2024
No menu items!
HomeDatabase ManagementStream data from Amazon DocumentDB to Amazon Kinesis Data Firehose using AWS...

Stream data from Amazon DocumentDB to Amazon Kinesis Data Firehose using AWS Lambda

In this post, we discuss how to create the data pipelines from Amazon DocumentDB (with MongoDB compatibility) to Amazon Kinesis Data Firehose and publish changes to your destination store.

Amazon DocumentDB (with MongoDB compatibility) is a scalable, highly durable, and fully managed database service for operating mission-critical JSON workloads for enterprises. Amazon DocumentDB simplifies your architecture by providing built-in security best practices, continuous backups, and native integrations with other AWS services.

Amazon Kinesis cost-effectively processes and analyzes streaming data at any scale as a fully managed service. With Kinesis, you can ingest real-time data, such as video, audio, application logs, website clickstreams, and internet of Things (IoT) telemetry data, for machine learning (ML), analytics, and other applications. Amazon Kinesis Data Firehose is a streaming extract, transform, and load (ETL) solution that reliably captures, transforms, and delivers streaming data to data lakes, data stores, and analytics services.

Solution overview

Kinesis Data Firehose can help you to load streaming data from Amazon DocumentDB into data stores and analytics tools. It can capture, transform, and load streaming data into Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service, and Splunk, enabling near-real-time analytics with existing business intelligence (BI) tools and dashboards. The same change stream can be read by multiple consumers without interfering with each other. The following diagram illustrates this architecture.

In this post, we explore the use case of how to continually archive data from Amazon DocumentDB to Amazon S3 using Kinesis Data Firehose with minimal performance impact and a minimal code approach.

The following diagram illustrates the straightforward, deployable architecture for the use case. We capture all the incoming changes as is and archive them in Amazon S3. The event source mappings (ESM) integration is provided by AWS Lambda. At a high level, Lambda ESM reads change stream events from Amazon DocumentDB and passes them to your custom Lambda function. The Lambda function processes the change stream event data and pushes information to Firehouse and finally Firehose writes the data to S3. The consumer of this data, now in a S3 bucket, would have to build in intelligence in their application to deduplicate and merge the data pertaining to the same logical unit in the source database. This gives us the flexibility to understand how a given entity changed over time, which can be useful.

However, you often need to format the data before delivering it to your target. Kinesis Data Firehose supports built-in data format conversion from raw or JSON data into formats like Apache Parquet and Apache ORC required by your destination data stores, without having to build your own data processing pipelines. Additionally, you might be using the data stored in Amazon S3 as a source for your Amazon QuickSight reports (facilitated by Amazon Athena). In this case, it might be useful to flatten the structure out for simpler querying.

In this post, we are neither transforming the records nor changing the format. We are simply archiving all changes from Amazon DocumentDB to Amazon S3 using Kinesis Data Firehose.

Prerequisites

To follow along with this post, you need to configure the following resources:

Create an AWS Cloud9 environment.
Create an Amazon DocumentDB cluster.
Install mongo shell in Cloud9.
Enable change stream on the Amazon DocumentDB cluster.
Create a secret in AWS Secrets Manager for Lambda to connect to Amazon DocumentDB.
Create a customer managed permission policy for the Lambda function.
Create a VPC endpoint for the Lambda handler
Create a VPC endpoint for Secrets Manager.
Create an S3 bucket.
Create a Firehose delivery stream with the destination as the S3 bucket.

Note: These resources will have a cost associated for creating and using them. Refer to the pricing page for more information.

Create the AWS Cloud9 environment

Open the Cloud9 console and choose Create environment.
Create an environment with the following configuration:
Under Details:
Name – DocumentDBCloud9Environment
Environment type – New EC2 instance

Under New EC2 instance:
Instance type – t2.micro (1 GiB RAM + 1 vCPU)
Platform – Amazon Linux 2
Timeout – 30 minutes

Under Network settings:
Connection – AWS Systems Manager (SSM)
Expand the VPC settings dropdown.
Amazon Virtual Private Cloud (VPC) – Choose your default VPC.
Subnet – No preference

Keep all other default settings.

Choose Create. Provisioning your new AWS Cloud9 environment can take several minutes.
Add inbound rules for cloud9 environment to default security group
Open the EC2 console. Under Network and Security, choose Security groups.
Choose default security group ID.
Under Inbound Rules choose Edit inbound rules
Choose Add rule. Create a rule with the following configuration:
Type – Custom TCP
Port range – 27017
Source – Custom
In the search box next to Source, choose the security group for the AWS Cloud9 environment you created in the previous step. To see a list of available security groups, enter cloud9 in the search box. Choose the security group with the name aws-cloud9-<environment_name>

Create an Amazon DocumentDB cluster

Open the Amazon DocumentDB console. Under Clusters, choose Create.
Create a cluster with the following configuration:
For Cluster type, choose Instance Based Cluster.
Under Configuration:
Engine version – 4.0.0
Instance class – db.t3.medium
Number of instances – 1.

Under Authentication:
Enter the Username and Password needed to connect to your cluster (same credentials as you used to create the secret in the previous step). In Confirm password, confirm your password.
Select Show advanced settings.

Under Network settings:
Virtual Private Cloud (VPC) – Choose your default VPC.
Subnet group – default
VPC security groups – default

Keep all other default settings.

Choose Create cluster. Provisioning your DocumentDB cluster can take several minutes.

Install the mongo shell

To install the mongo shell on your Cloud9 environment

Open the Cloud9 console. Next to the DocumentDBCloud9Environment environment you created earlier, choose Open under the Cloud9 IDE column.
Open a terminal window and create the MongoDB repository file with the following command:

echo -e “[mongodb-org-4.0] nname=MongoDB Repositorynbaseurl=https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/4.0/x86_64/ngpgcheck=1 nenabled=1 ngpgkey=https://www.mongodb.org/static/pgp/server-4.0.asc” | sudo tee /etc/yum.repos.d/mongodb-org-4.0.repo

Install the mongo shell with the following command:

sudo yum install -y mongodb-org-shell

Enable a change stream on the Amazon DocumentDB cluster

The change streams feature in Amazon DocumentDB provides a time-ordered sequence of change events that occur within your cluster’s collections. Applications can use change streams to subscribe to data changes on individual collections. You can enable Amazon DocumentDB change streams for all collections within a given database, or only for selected collections.

We now create a database called docdbdemo with a collection called products and activate change streams for the collection:

Open the DocumentDB console. Under Clusters, choose your cluster by choosing its cluster identifier.
To encrypt data in transit, download the public key – In the Connectivity & security tab, under Connect to this cluster with the mongo shell, choose Copy. The following command downloads a file named <<aws_region>>-bundle.pem.

wget https://truststore.pki.rds.amazonaws.com/<<region>>/<<region>>-bundle.pem

In the Connectivity & security tab, under Connect to this cluster with the mongo shell, choose Copy.
In your Cloud9 environment, paste this command into the terminal. Replace <insertYourPassword> with the correct password.
After entering this command, if the command prompt becomes rs0:PRIMARY>, then you’re connected to your Amazon DocumentDB cluster.
In the terminal window, use the following command to create a new database called docdbdemo:

use docdbdemo

Then, use the following command to insert a record into docdbdemo:

db.products.insert({“hello”:”world”})

You should see output that looks like this:

WriteResult({ “nInserted” : 1 })

Use the following command to list all databases:

show dbs

Ensure that your output contains the docdbdemo database:

docdbdemo 0.000GB

Next, activate change streams on the products collection of the docdbdemo database using the following command:

db.adminCommand({modifyChangeStreams: 1,
database: “docdbdemo”,
collection: “products”,
enable: true});

You should see output that looks like this:

{ “ok” : 1, “operationTime” : Timestamp(1680126165, 1) }

You can list change streams with the following code:

cursor = new DBCommandCursor(db,db.runCommand({aggregate:1,pipeline:[{$listChangeStreams:1}],cursor:{}}));

Create a secret in Secrets Manager to connect to DocumentDB

With Secrets Manager, you can replace hard-coded credentials in your code (including passwords) with an API call to Secrets Manager to retrieve the secret programmatically. In this post, Lambda retrieves a secret from Secrets Manager to read the changes from the Amazon DocumentDB change stream. For more information, refer to How Amazon DocumentDB (with MongoDB compatibility) uses AWS Secrets Manager.

To create your secret, complete the following steps:

On the Secrets Manager console, choose Secrets in the navigation pane.
Choose Store a new secret.
For Secret type, select Other type of secret.
For Key/value pairs, choose the Plaintext tab and enter the following:

{
“username”:”<DocumentDB change stream reader user username>“,
“password”:”<DocumentDB change stream reader user password>
}

Choose Next.
For Secret name, enter catalogDatabaseSecret.
Choose Next.
Choose Next again.
Choose Store.

Create a customer managed permission policy for the Lambda function

Open the Policies page in the IAM console and choose Create policy.
Choose the JSON tab. Copy the following policy into the editor.

{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: [
“ec2:CreateNetworkInterface”,
“rds:DescribeDBClusterParameters”,
“kms:Decrypt”,
“secretsmanager:GetSecretValue”,
“ec2:DescribeNetworkInterfaces”,
“rds:DescribeDBSubnetGroups”,
“ec2:DescribeVpcs”,
“ec2:DeleteNetworkInterface”,
“ec2:DescribeSubnets”,
“ec2:DescribeSecurityGroups”,
“rds:DescribeDBClusters”,
“firehose:DeleteDeliveryStream”,
“firehose:PutRecord”,
“firehose:PutRecordBatch”,
“firehose:UpdateDestination”
],
“Resource”: “*”
}
]
}

For Name, enter LambdaESMPolicy.
Choose Create policy.
The following screenshot shows the policy details on the console.

Create a VPC endpoint for the Lambda handler

Amazon DocumentDB is a VPC-only service. You need to create a VPC endpoint using Amazon Virtual Private Cloud (Amazon VPC) for the Lambda function so that it can connect to Amazon DocumentDB.

On the Amazon VPC console, choose Endpoints in the navigation pane.
Choose Create endpoint.
For Name tag, enter Lamdaendpointdemo.
For Service category, select AWS services.
Search for and select com.amazonaws.<region>.lambda.

For VPC, choose the VPC where the Amazon DocumentDB cluster is deployed.
For Subnets, select all the subnets where Amazon DocumentDB will be deployed.
For Security groups, choose the Amazon DocumentDB default security group.
Choose Create endpoint.

Create a VPC endpoint for Secrets Manager

Complete the following steps to create a VPC endpoint for Secrets Manager:

On the Amazon VPC console, choose Endpoints in the navigation pane.
Choose Create endpoint.
For Name tag, enter SMendpointdemo.
For Service category, select AWS services.
Search for and select com.amazonaws.<region>.secretsmanager.

For VPC, choose the VPC where the Amazon DocumentDB cluster is deployed.
For Subnets, select all the subnets where Amazon DocumentDB will be deployed.
For Security groups, choose the Amazon DocumentDB default security group.
Choose Create endpoint.
Confirm that the status for both endpoints shows as Available.

Create an S3 bucket

Amazon S3 is an object storage service built to store and retrieve any amount of data from anywhere. It offers industry-leading durability, availability, performance, security, and virtually unlimited scalability at very low costs. For this post, you need to create an S3 bucket for Kinesis Data Firehose to store the Amazon DocumentDB changes.

Create a Firehose delivery stream with the destination as Amazon S3

Lastly, you need to create and configure a delivery stream in Kinesis Data Firehose and define the destination as the S3 bucket that you created in the previous step.

On the Kinesis console, choose Kinesis Data Firehose in the navigation pane.

Choose Create delivery stream.
For Source, choose Direct PUT.
For Destination, choose Amazon S3.
For Delivery stream name, enter docdb-backup-ds.

Under Destination settings, for S3 bucket, enter the path for the bucket you created in step “Create an S3 bucket”.
Under Destination settings, for S3 bucket error output prefix – optional enter docdbdemo_products_backups_

The following configurations are optional:
You can choose a buffer size (1–128 MB) or buffer interval (60–900 seconds). Whichever condition is satisfied first triggers data delivery to Amazon S3. Configure this depending on how fast you want your data to be delivered. In this post we use1MB and 60 seconds.
Compression is good for cost savings in Amazon S3 if you have use cases where a lot of data will be archived in your buckets. However, your application reading that data will need to decompress the data on its end, so an added overhead must be considered. If you’re using Athena for reporting on that data, note that only Snappy and Gzip are supported.
You can enable the destination error logs to log any delivery errors to Amazon CloudWatch as well as the Kinesis Data Firehose console.
Under Advanced settings, for Amazon CloudWatch error logging, make sure it is Enabled.

Leave all other options as default and create the delivery stream.

Stream changes from Amazon DocumentDB to Kinesis Data Firehose

To stream changes from Amazon DocumentDB to Kinesis Data Firehose, you must first create your Lambda function, then add Amazon DocumentDB as a trigger for the function.

Create a Lambda function

You need to create a Lambda function to read the changes from the stream and push them to a Firehose delivery stream. In this post we create a function called ProcessDocumentDBRecords using the following code snippet (using Python 3.8 runtime):

import json
import boto3

# Consider using env variables for STREAM_NAME to make it easier to change things in the future
STREAM_NAME = “docdb-backup-ds”

def lambda_handler(event, context):
# TODO implement
kinesis_client = boto3.client(‘firehose’, region_name=’<region>‘)
kinesis_client.put_record(
DeliveryStreamName=STREAM_NAME,
Record={ ‘Data’ : json.dumps(event)})
#Remember to change the region in code above

Your Lambda function is now created with a new execution role. Now we must edit the function’s configuration and update its role with the policy created earlier.

Open your Lambda function and select the Configuration tab.
Select Properties on the left and open the link under Role name

A new tab will open the IAM console showing the Lambda function role.
Select Add permissions and choose Attach policies
Add the custom policy created in Create a customer managed permission policy for the Lambda functionLambdaESMPolicy

Define Amazon DocumentDB as a trigger for the Lambda function

After you create the function and update the policy with the Lambda role, you can add a trigger. Complete the following steps:

On the Lambda console, navigate to your function.
Navigate to Configuration, Choose Triggers in the navigation pane.
Choose Add trigger.
Under Trigger configuration, choose DocumentDB.
Select your DocumentDB cluster.
For Database name, enter a name (docdbdemo for this post).
For Collection name, enter an optional name(products for this post).
Change Batch size to 1.
The Batch size option allows you to set the maximum number of messages to retrieve in a single batch, up to 10,000. The default batch size is 100. This could have an impact if you’re implementing complex transformations using the function code. This will also factor into the Lambda function timeout settings—you must be able to process the events within the Lambda function timeout settings.
Leave Starting position as Latest.
You have three options for Starting position:

Latest – Process only new records that are added to the stream. This will only pick up changes after your event source is created successfully.
Trim horizon – Start from the beginning of the available change streams. This is typically used if you start your change stream before you deploy your Lambda function or if for some reason want to replay all the available changes.
At timestamp – Process records starting from a specific time. This timestamp needs to be chosen according to the change stream retention period.

For Secrets Manager key, select the secret you created earlier.
For Full document configuration, choose UpdateLookup.
The Default option should be used if you want to capture only the changes being made in that operation (for example, only a few fields are updated). The UpdateLookup option should be used when you need the current state of the entire document. Note that UpdateLookup needs to fetch the current state of the document and therefore has a performance penalty. If using this option, you will need to test your workload and gauge the degree of impact.
Choose Add.

Make sure the trigger state is enabled and there is no error. It will take a few moments to create the trigger. After it has been created the state will be “enabled.”

Test the setup

To test the setup log in to the Amazon DocumentDB cluster from the Cloud9 terminal, launch mongo shell and insert some data:

use docdbdemo;
db.products.insertMany( [
{
“name”: “Handbag”,
“sku”: “3451290”,
“description”: “Fashion Hand bags for all ages”,
“inventory”: 75
},
{
“name”: “Round hat”,
“sku”: “8976045”,
“inventory”: 200
},
{
“name”: “Polo shirt”,
“sku”: “6497023”,
“description”: “Cool shirts for hot summer”,
“inventory”: 25
},
{
“name”: “Swim shorts”,
“sku”: “8245352”,
“description”: “Designer swim shorts for athletes”,
“inventory”: 200
},
{
“name”: “Running shoes”,
“sku”: “3243662”,
“description”: “Shoes for work out and trekking”,
“inventory”: 20
}
]);

When Kinesis Data Firehose sends data files over, the data inside the bucket is prefixed in the manner shown in following screenshot: <year>/<month>/<date>/<hour>. Kinesis Data Firehose automatically manages the prefixing. Each file represents a Kinesis firehose batch sent over by the delivery stream as per the batching settings.

After a few minutes, check on the Amazon S3 console. Select your S3 bucket and look for an object named docdbdemo_products_backups_2023 and then hour of the day prefix as shown in screenshot above. You can download the file and check its contents. You will notice there are five events corresponding to the five inserts:

{
“eventSourceArn”:”arn:aws:rds:us-east-1:XXXXXXXXX:cluster:XXXXXX”,
“events”:[
{
“event”:{
“_id”:{
“_data”:”01643337b2000000070100000007000041e1″
},
“clusterTime”:{
“$timestamp”:{
“t”:1681078194,
“i”:7
}
},
“documentKey”:{
“_id”:{
“$oid”:”643337b2880271bd6ef9c91b”
}
},
“fullDocument”:{
“_id”:{
“$oid”:”643337b2880271bd6ef9c91b”
},
“name”:”Handbag”,
“sku”:”3451290″,
“description”:”Fashion Hand bags for all ages”,
“inventory”:75.0
},
“ns”:{
“db”:”docdbdemo”,
“coll”:”products”
},
“operationType”:”insert”
}
}
],
“eventSource”:”aws:docdb”
}{
“eventSourceArn”:”arn:aws:rds:us-east-1:XXXXXXXXX:cluster:XXXXXX”,
“events”:[
{
“event”:{
“_id”:{
“_data”:”01643337b2000000080100000008000041e1″
},
“clusterTime”:{
“$timestamp”:{
“t”:1681078194,
“i”:8
}
},
“documentKey”:{
“_id”:{
“$oid”:”643337b2880271bd6ef9c91c”
}
},
“fullDocument”:{
“_id”:{
“$oid”:”643337b2880271bd6ef9c91c”
},
“name”:”Round hat”,
“sku”:”8976045″,
“inventory”:200.0
},
“ns”:{
“db”:”docdbdemo”,
“coll”:”products”
},
“operationType”:”insert”
}
}
],
“eventSource”:”aws:docdb”
}{
“eventSourceArn”:”arn:aws:rds:us-east-1:XXXXXXXXX:cluster:XXXXXX”,
“events”:[
{
“event”:{
“_id”:{
“_data”:”01643337b2000000090100000009000041e1″
},
“clusterTime”:{
“$timestamp”:{
“t”:1681078194,
“i”:9
}
},
“documentKey”:{
“_id”:{
“$oid”:”643337b2880271bd6ef9c91d”
}
},
“fullDocument”:{
“_id”:{
“$oid”:”643337b2880271bd6ef9c91d”
},
“name”:”Polo shirt”,
“sku”:”6497023″,
“description”:”Cool shirts for hot summer”,
“inventory”:25.0
},
“ns”:{
“db”:”docdbdemo”,
“coll”:”products”
},
“operationType”:”insert”
}
}
],
“eventSource”:”aws:docdb”
}{
“eventSourceArn”:”arn:aws:rds:us-east-1:XXXXXXXXX:cluster:XXXXXX”,
“events”:[
{
“event”:{
“_id”:{
“_data”:”01643337b20000000a010000000a000041e1″
},
“clusterTime”:{
“$timestamp”:{
“t”:1681078194,
“i”:10
}
},
“documentKey”:{
“_id”:{
“$oid”:”643337b2880271bd6ef9c91e”
}
},
“fullDocument”:{
“_id”:{
“$oid”:”643337b2880271bd6ef9c91e”
},
“name”:”Swim shorts”,
“sku”:”8245352″,
“description”:”Designer swim shorts for athletes”,
“inventory”:200.0
},
“ns”:{
“db”:”docdbdemo”,
“coll”:”products”
},
“operationType”:”insert”
}
}
],
“eventSource”:”aws:docdb”
}{
“eventSourceArn”:”arn:aws:rds:us-east-1:XXXXXXXXX:cluster:XXXXXX”,
“events”:[
{
“event”:{
“_id”:{
“_data”:”01643337b20000000b010000000b000041e1″
},
“clusterTime”:{
“$timestamp”:{
“t”:1681078194,
“i”:11
}
},
“documentKey”:{
“_id”:{
“$oid”:”643337b2880271bd6ef9c91f”
}
},
“fullDocument”:{
“_id”:{
“$oid”:”643337b2880271bd6ef9c91f”
},
“name”:”Running shoes”,
“sku”:”3243662″,
“description”:”Shoes for work out and trekking”,
“inventory”:20.0
},
“ns”:{
“db”:”docdbdemo”,
“coll”:”products”
},
“operationType”:”insert”
}
}
],
“eventSource”:”aws:docdb”
}

Limitations

Amazon S3 has a limitation of 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in an S3 bucket. Because Kinesis Data Firehose batches and sends the data as a single file, this limit is good enough for most systems.

By default, each Firehose delivery stream can take in up to 2,000 transactions per second, 5,000 records per second, or 5 MB per second. If you use PutRecord and PutRecordBatch, the limits are an aggregate across these two operations for each delivery stream. For more information about limits and how to request an increase, refer to Amazon Kinesis Data Firehose Quota. You must specify the name of the delivery stream and the data record when using PutRecord. The data record consists of a data blob that can be up to 1,000 KiB in size, and any kind of data. For example, it can be a segment from a log file, geographic location data, website clickstream data, and so on.

Clean up

To clean up the resources, delete them in the following order:

Delete the Lambda function and its associated role.
Delete the Firehose delivery stream.
Delete the S3 bucket.
Delete the VPC endpoints for the Lambda handler and Secrets Manger.
Delete the secret from Secrets Manger.
Delete the custom policy.
Delete the Amazon DocumentDB cluster.
Delete the Cloud9 environment.

Summary

In this post, we discussed how to run and configure an Amazon DocumentDB trigger for Lambda to continuously stream data changes from Amazon DocumentDB to downstream services. You can use this solution for a variety of use cases, such as archiving and backing up data, copying data to OpenSearch Service for text searches, or copying data over to Amazon Redshift for data warehouse use cases. We showed how to create an Amazon DocumentDB cluster and enable a change stream on a collection, and then create a Lambda function that writes the data over to a Firehose delivery stream that has an S3 bucket as its destination. The Lambda function has a trigger that runs the function whenever a new change appears on the Amazon DocumentDB change stream. Finally, we demonstrated how to issue some DML changes on the source (Amazon DocumentDB) and check the data on the target (S3 bucket).

Leave a comment. We’d love to hear your thoughts and suggestions.

About the Authors

Anshu Vajpayee is a Senior DocumentDB Specialist Solutions Architect at Amazon Web Services (AWS). He has been helping customers to adopt NoSQL databases and modernize applications leveraging Amazon DocumentDB. Before joining AWS, he worked extensively with relational and NoSQL databases.

Sourav Biswas is a Senior DocumentDB Specialist Solutions Architect at Amazon Web Services (AWS). He has been helping AWS DocumentDB customers successfully adopt the service and implement best practices around it. Before joining AWS, he worked extensively as an application developer and solutions architect for various noSQL vendors

Read MoreAWS Database Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments