Friday, April 19, 2024
No menu items!
HomeDatabase ManagementBuild an incremental data load solution using AWS DMS checkpoints and database...

Build an incremental data load solution using AWS DMS checkpoints and database logs

In this post, we explain how to utilize AWS Database Migration Service (AWS DMS) for incremental data loads without running the AWS DMS instance continuously. We demonstrate how to store the checkpoint data between each run, so that the AWS DMS task can utilize the checkpoint information and extract data from the source databases in an incremental fashion with every new run.

Users typically have to run their AWS DMS instance continuously even in cases where the source database is populated once a day or on a periodic schedule. Running an AWS DMS instance to continuously stream data changes in these situations isn’t cost effective because there aren’t any data changes to capture except for when the source database is loaded per schedule. The other approach is to use a SQL-based data extract using a timestamp or a monotonically increasing column in the source tables to capture new data, but this approach is limited and will have difficulties in capturing updated data, deleted data, and DDL changes. SQL-based extracts also put more load on the source database servers. You can use incremental data load jobs based on AWS Glue bookmarks for data sources that aren’t supported by AWS DMS. But these jobs also will encounter the same difficulties in identifying deleted records and extracting DDL changes.

AWS DMS tasks store checkpoint data internally, so that the task can be stopped and modified, and can continue data extraction without repopulating the entire data. We explain how to capture the checkpoint data and store it separately in an Amazon DynamoDB table, so that the AWS DMS task and AWS DMS instance can be successfully deleted and created again, and the extract can continue from the checkpoint of the last completed AWS DMS task.

Solution overview

The following diagram shows the high-level flow of the architecture that we implement in this post.

An AWS DMS task migrates data from an Amazon Relational Database (Amazon RDS) for MySQL database to an Amazon Aurora PostgreSQL-Compatible Edition cluster. The AWS DMS replication instance and replication task are created using AWS Lambda functions in an AWS Step Functions workflow. The Lambda functions access an Amazon DynamoDB table to retrieve checkpoint data from the last successful data load and create a new AWS DMS task to load data from that checkpoint onwards. Additionally, you’re notified of the task completion or failure status in the Step Functions workflow itself via Amazon Simple Notification Service (Amazon SNS).

The following diagram shows further details on the Step Functions workflow.

The following steps are involved in each incremental data load:

A Lambda function creates an AWS DMS replication instance based on the configurations you provide. This allows you to customize the type of instance to be created for each data load. For example, you can use a high capacity instance for data capture during weekdays and a small capacity instance during weekends.
Another Lambda function creates an AWS DMS replication task with a custom change data capture (CDC) start option using the latest checkpoint data retrieved from the DynamoDB table. This makes sure that the AWS DMS task captures the new CDC changes that were made after the last successful data load.
The AWS DMS task is started and monitored for completion by a Lambda function. The task is created with the custom CDC endpoint time, therefore the task captures all changes between the last successful run until the current time and then moves to stopped status.
Another Lambda function collects CDC metrics like the number of records inserted, deleted, or modified, along with table information and updates them in a separate DynamoDB table. This function also updates the checkpoint from the AWS DMS task into the DynamoDB table so that the next task can be created with a custom start point to continue data extraction without any overlap.
The AWS DMS task is checked for completion status and is deleted by a Lambda function when complete. You’re also notified in an email about the status of the task.
After the AWS DMS task has been deleted successfully, the AWS DMS instance is deleted by another Lambda function, which concludes the current run of the Step Functions workflow.

The following steps are involved in the implementation of this solution:

Create resources using the provided AWS CloudFormation template. We have tested this CloudFormation template in the N. Virginia (us-east-1) region. If you are deploying these resources in any other region, you may need to adjust resources such as subnets according to that region.
Connect to the MySQL database using a MySQL client from an Amazon Elastic Compute Cloud (Amazon EC2) instance and run the AWS DMS prerequisites for capturing change data.
Validate that the CDC load process captures only the incremental data.
Validate the CDC load process in an automated manner.

Incremental data loads

Transaction logs (also called binlogs, redo logs, or database logs) capture every data change made by the database management system. These changes are stored separately in log files, which have details about each transaction run on that database server and also support database recovery. Database systems use these logs to roll back uncommitted changes and reapply changes from committed transactions.

MySQL uses the term binlogs (row-based binary logs) for transaction logs along with an incremental number called a log sequence number (LSN) to identify transactions within the binlog. A redo record is a set of change vectors that describe the changes made to a single block on the database. During a transaction, a user process is notified of successful completion of the transaction only after the necessary redo records are flushed to the disk. Every time a commit happens, the database assigns an LSN to identify the redo records for each committed transaction.

Using AWS DMS for performing incremental data loads has the following benefits:

For data stores that are loaded only periodically, you can utilize AWS DMS to capture incremental data changes through database transaction logs and also enjoy the cost benefit of not running the AWS DMS replication instance continuously.
For production databases with heavy user workloads, the capability to pause and resume CDC replication can help administrators run replication tasks at off-peak hours.
AWS DMS uses transaction logs to copy incremental data, which reduces pressure on the database. Additionally, using ETL-based extraction (extract, transform, and load) for incremental data is complex and doesn’t cover all scenarios easily (such as delete situations and DDL changes).

Prerequisites

The CloudFormation template requires you to select an EC2 key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to access the Aurora cluster and RDS for MySQL database that are hosted in the private subnet. Make sure you have a key in the Region where you deploy the template. If you don’t have one, you can create a new key pair.

Deploy your resources

Choose Launch Stack to provision the resources needed for the solution:

The following table lists the parameters to provide.

Parameter Name
Description
KeyName
The EC2 key pair to be configured in the EC2 instance on the public subnet. This EC2 instance is used to get to the Aurora cluster or MySQL instance in the private subnet.
VpcCIDR
The CIDR range of the VPC. You can retain defaults if there is no other VPC with the same CIDR range in your account.
DatabaseUsername
The user name for logging in to the RDS for MySQL and Aurora PostgreSQL instances from AWS DMS for data extract.
DatabasePassword
The password for the RDS for MySQL and Aurora PostgreSQL database instances. Use a secure password that meets your project requirements.
DynamoDBtable
The table name that is created in DynamoDB for capturing the checkpoint information from each run. This data is used in AWS DMS task creation for incremental data loads.
NotificationsEmail
The email address where you want the status of the AWS DMS load to be sent after completion. This could be a distribution list as well to monitor the data loads. For this demonstration, you could use your individual email to receive emails.
UserIPAddress
The IP address of the machine from where you access the RDS databases. It’s important to enter the correct IP address because this is added to the security group of the DB instances and determines whether the incoming connection will be accepted by the RDS instance. Example – On Mac, you can run the following command to get your IP in CIDR notation using – curl ipecho.net/plain ; echo /32 . On windows use ipconfig /all command and copy the IPv4 address and append with /32. Use subnet mask as applicable for your network from the examples.

It may take up to 15–20 minutes for AWS CloudFormation to deploy the resources. After the CloudFormation template has been deployed successfully, you can continue to the following steps.

Connect to the source and target database endpoints and apply AWS DMS configurations

Connect to the RDS for MySQL endpoint using your preferred client tool. For this post, we use the MySQL CLI tool. Note that the IP address of the client machine from which you’re connecting to the databases must be updated in the database security group to enable connection to the databases. This is done by the CloudFormation template based on the input parameter UserIpAddress. If you’re accessing the database from another machine, remember to update the database security group accordingly.

Connect to your EC2 instance from a command line using a connection string from the EC2 instance, as shown in the following screenshot.
Use the following connection string to make a connection with the MySQL instance and enter the password that was provided in the CloudFormation script:

mysql -h <<RDS MYSQL server name>> -u <<database user name>> -P 3306 -p

You can obtain the RDS for MySQL server name from the outputs of the CloudFormation template.

Now we can connect to the MySQL instance successfully.

Run the following queries to create a table and insert data into that table. This table is exported to the PostgreSQL target.

create table studentdb.student_details (student_id int PRIMARY KEY,name varchar(30), department varchar(30), email varchar(50),batch_year int);

insert into studentdb.student_details values (1, ‘Jake’, ‘IT’, ‘[email protected]’,2020);
insert into studentdb.student_details values (2, ‘John’, ‘ComputerScience’, ‘[email protected]’,2021);
insert into studentdb.student_details values (3, ‘Kelly’, ‘Commerce’, ‘[email protected]’,2022);
insert into studentdb.student_details values (4, ‘Bill’, ‘IT’, ‘[email protected]’,2020);
insert into studentdb.student_details values (5, ‘Joe’, ‘ComputerScience’, ‘[email protected]’,2020);
insert into studentdb.student_details values (6, ‘Steve’, ‘IT’, ‘[email protected]’,2021);
insert into studentdb.student_details values (7, ‘Mike’, ‘ComputerScience’, ‘[email protected]’,2022);

Verify the data is successfully loaded into the table student_details.

Run the following command to ensure that supplemental logging is turned on:

call mysql.rds_set_configuration(‘binlog retention hours’, 24);
call mysql.rds_show_configuration;

The output should specify whether supplemental logging is enabled. You should specify the binlog retention time based on your Recovery Time Objective (RTO) requirements. We use 24 hours as binlog_retention_hours for the purpose of this post.

Open a new connection to the EC2 instance from another command line window and run the following commands to connect to the PostgreSQL instance(refer to CloudFormation output – TargetAuroraPostgreSQLEndpoint), and enter a password when prompted (the same that you provided in the CloudFormation template):

psql -h <<RDSpostgres db>> -p 5432 -U <<username>> -d AuroraDB

After you’re successfully connected to the PostgreSQL instance, run the following command to see the available databases:

select schema_name,schema_owner from information_schema.schemata;

You can verify that that we don’t have the studentdb schema or tables available yet in the database, which are created as a part of the AWS DMS migration.

Validate the CDC process can capture incremental data from source databases

In this section, we walk through the steps to validate the CDC process can capture incremental data from the source databases.

Run the Step Functions workflow to copy the full load data

To run the Step Functions workflow, complete the following steps:

On the Step Functions console, locate the two workflows created by the CloudFormation template.
Choose the workflow DMS-Fullload-pipeline.

Choose Start execution and wait for the workload to complete. This workflow moves the full volume data from the MySQL instance to the PostgreSQL instance. You don’t need to change the input parameters because the full volume workflow doesn’t require any input parameters for its completion.

Validate the job has been completed successfully. At this stage, the full volume data has been migrated to the Aurora PostgreSQL instance. The workflow utilizes an existing AWS DMS instance and AWS DMS task created from the CloudFormation template, which migrate the entire data and stop after applying cached changes. You can navigate to the AWS DMS console and explore the AWS DMS task settings.

After the full load task has completed without any errors, go to psql client window and list the schema again. You can verify the student_details table has been created and loaded with seven records that were inserted earlier into MySQL.The workflow also updated the AWS DMS checkpoint into the DynamoDB table for use in subsequent CDC tasks. You can navigate to the DynamoDB console and verify that the checkpoint has been updated successfully.

Insert records into the source MySQL database

Open the MySQL client inside the EC2 instance and run the following queries to insert CDC records into the MySQL database:

insert into studentdb.student_details values (8, ‘Student8’, ‘ComputerScience’, ‘[email protected]’,2022);
insert into studentdb.student_details values (9, ‘Student9’, ‘ComputerScience’, ‘[email protected]’,2022);

You can verify the records have been inserted successfully in the MySQL instance, and also query the PostgreSQL instance and see that the new records aren’t available there yet.

The following screenshot shows the newly inserted records aren’t available yet in PostgreSQL.

Run the Lambda function to initiate the CDC process

Seven new functions are listed on the Lambda console: six are used in the Step Functions workflow, and one invokes the workflow with the correct input parameters (CDC end time based on current time, AWS DMS task settings and table mapping).

To run the CDC process, choose the function dmcblog-CallDMSLambda and run it. The function starts the CDC Step Functions workflow. Again, the function doesn’t require any inputs.

Verify that the Step Functions workflow DMS-Pipeline is being run.

The Lambda function invokes the Step Functions workflow and passes the timestamp when the AWS DMS task should stop, along with task settings and table mapping settings for the AWS DMS task. Both table mapping and task settings can be passed from the code or through files within the Lambda function environment. You can copy the Lambda function and change these files to run different incremental data load pipelines within the same account.

The following screenshot shows code segments from dmcblog-CallDMSLambda lambda function for providing table mapping and task setting through files within Lambda function environment.

Check your email to verify you received a notification email about the job completion along with CDC statistics. Please note this step might take additional time as it involves creating a new DMS instance before executing an AWS DMS migration job. You can monitor the progress of the workflow in the step function console.

The email is sent after the load is complete and before deleting the AWS DMS instance and AWS DMS task.

Verify that only incremental records were populated through AWS DMS and the instance has been shut down

When the Step Functions workflow is complete, verify that the new records are populated in the Aurora PostgreSQL instance through the psql client.

You can also see that the AWS DMS replication task and replication instance are being deleted as a part of the Step Functions workflow.

Make sure the task and instance are deleted completely.

Validate the CDC load process in an automated manner

Now that we have validated that we can populate incremental records through AWS DMS, we can set up an automated pipeline to validate the AWS DMS incremental load process.

Connect to the EC2 instance and run the insert script in a periodic manner

On the Amazon EC2 console, choose the EC2 instance that was created through the CloudFormation template. This is the jump-box instance that we used earlier for connecting to the RDS databases.

When you log in to the instance, you can see a script that is available in the EC2 instance for inserting records into the MySQL database. Now we run the script periodically using a cron scheduler to verify the AWS DMS incremental load pipeline.

Run the following commands on the command line:

crontab -l > temp_cron.txt
cat <<EOF >> temp_cron.txt
*/5 * * * * python3 /home/ec2-user/insert_mysql.py
EOF
cat temp_cron.txt | crontab –
crontab -l
rm temp_cron.txt

Now we have the cron scheduler set up to run the script insert_mysql.py every 5 minutes, inserting records into the MySQL database. You can change the frequency of which the records are inserted to any time that is applicable for your use case.

Run the command “crontab -e” to verify whether the changes are stored inside the cron scheduler

Set up the CDC load function to be run on a schedule

We create an Amazon EventBridge rule to run the CDC load function on a schedule.

On the Amazon EventBridge console, create a new rule.
For Name, enter DMS-incremental-load.
For Event bus, choose default.
For Rule type, select Schedule.

Choose Continue to create rule. In this post, we have used the legacy flow for creating the schedule. You can also use the new EventBridge scheduler UI for creating this schedule.
For Schedule pattern, select A schedule that runs at a regular rate.
For this post, we want to call the AWS DMS workflow every 60 minutes to populate incremental records (12 records that got inserted every 5 minutes). Alternatively, you can choose to run the AWS DMS workflow at any frequency you want or any pattern using cron format.

For Target type, select AWS service.
For Select a target, choose Lambda function.
For Function, choose dmcblog-CallDMSLambda so the AWS DMS workflow is run every time with the appropriate parameters.

Choose Next.
Check your email to verify that AWS DMS loads only incremental data in a periodic fashion.
You can also query the DynamoDB CDC table using the PartiQL query editor to understand the CDC load metrics across different loads:

select TableName,load_date,Inserts,Deletes,Updates
from dms_load_cdc_metrics where (Deletes>0 or Inserts>0 or Updates>0)

The output shows that we have been successfully replicating 12 records that get inserted on a periodic basis from the cron scheduler to the Aurora PostgreSQL instance. You can modify this EventBridge schedule-based trigger to run on a frequency of your choice.

Considerations

Consider the following when using this solution:

If you want to run the task at a specific time every day, start the AWS DMS task workflow at least 15–20 minutes early because the AWS DMS instance provisioning takes some time.
The provisioning of the AWS DMS instance and task typically takes 15 minutes based on the Region and other factors. Therefore, running this at a 60-minute interval or less isn’t cost-efficient. Also, for this incremental load, the frequency of the pipeline creation should be decided based on the source log retention period to avoid data loss. It’s advisable to have the source log retention period be 4–5 times the frequency of the pipeline run in a production setting.
The workflow has been configured to fail in case of issues, and the AWS DMS instance and task will not be deleted to allow for debugging efforts. You will be notified through email of the failure so you can work on the issue immediately. The subsequent runs of the workflow will fail so you avoid running the extract until the error if fixed. After the error is fixed, delete the AWS DMS task and replication instance for the next run of the AWS DMS workflow to succeed.
The RDS database instance is configured to be deployed within a single AZ in the CloudFormation template. For actual production implementation, you should use multi-AZ setup and enable deletion protection. Similarly validate services like Amazon VPC, S3 buckets, IAM roles and others before production setup
In this post , we have configured the source database to have 24 hour log retention, but you should choose appropriate log retention time that meets your Recovery Time objectives(RTO) requirements to avoid DMS task from failing due to missing log files.

Clean up

To avoid incurring ongoing charges, clean up your infrastructure by deleting the stack via the AWS CloudFormation console. Delete the EventBridge rule and any other resources you created during this exercise.

Conclusion

In this post, we demonstrated how to perform incremental data loads using AWS DMS from database transaction logs. We created the AWS DMS replication task using the AWS DMS checkpoint from the last successful load, enabling us to populate only incremental data on each run. We also deleted the AWS DMS task and endpoint after the load, thereby avoiding cost accrual due to having an AWS DMS instance running continuously.

We also ran this entire extract pipeline as a part of a Step Functions orchestration along with EventBridge scheduling to have the data extract pipeline triggered at the frequency based on each project’s requirement.

Now, you can use AWS DMS with the workflow shown in this post to load incremental data from source databases without running the DMS instance continuously. You can try this solution and share your feedback in the comments section.

About the Authors

Sankar Sundaram is a Data Lab Architect at AWS, where he helps customers build and modernize data architectures and help them build secure, scalable, and performant data lake, database, and data warehouse solutions.

Prabodh Pawar is a Senior Database Engineer with the AWS DMS team at AWS. He builds services and tools that help automate database migrations. Prabodh is passionate about working with customers and helping them streamline their migration journeys.

Read MoreAWS Database Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments