Sunday, April 14, 2024
No menu items!
HomeDatabase ManagementCreate and run AWS DMS tasks using AWS Step Functions

Create and run AWS DMS tasks using AWS Step Functions

AWS Database Migration Service (AWS DMS) is a managed service that helps you migrate databases to AWS easily and securely. It supports various database sources and targets, including Amazon Relational Database Service (Amazon RDS), Amazon Aurora, Amazon Redshift, Amazon Simple Storage Service (Amazon S3), and more. With AWS DMS, you can migrate your data to and from most widely used commercial and open source databases.

AWS Step Functions is a serverless workflow orchestration service that allows you to coordinate and visualize multiple AWS services into serverless workflows. It provides a graphical interface to define the flow and conditions of your workflow using a JSON-based language called Amazon States Language.

In this post, we explore how you can use Step Functions to create and orchestrate AWS DMS tasks.

Solution overview

For this post, we use an S3 bucket as the source and Amazon Aurora PostgreSQL-Compatible Edition as the target database instance. You can automate the AWS DMS task creation by integrating with AWS Lambda and Step Functions. This solution provides an end-to-end pipeline to migrate the data in an automated way. By using Step Functions for AWS DMS task creation, you can simplify the process and gain the following benefits:

Automation – You can create a workflow that automates the entire AWS DMS task-creation process, reducing manual effort and potential errors.
Orchestration – You can define the flow and dependencies between steps, making it straightforward to manage complex migration scenarios.
Monitoring – Step Functions provides built-in logging and monitoring, giving you insights into the progress and status of each step in the replication process. This helps you identify bottlenecks, troubleshoot issues, and monitor the health of your migration.

The following diagram illustrates this architecture.

The workflow includes the following steps:

A user uploads CSV files to the S3 bucket.
When an object is uploaded into the S3 bucket, the ObjectPut event is initiated to invoke a Lambda function.
The Lambda function invokes a Step Functions state machine using the StartExecution API call.
The state machine orchestrates the creation of the AWS DMS task and initiates a full load from Amazon S3 to Amazon Aurora PostgreSQL.

Prerequisites

To get started, you must complete the following prerequisites:

Create a S3 bucket using the AWS Management Console or the AWS Command Line Interface (AWS CLI).
Set up an AWS DMS replication instance in the same Region as your source and target.
Create an Aurora PostgreSQL cluster.
Confirm network connectivity between the AWS DMS replication instance and Aurora PostgreSQL instance.
Ensure that the data stored in your S3 bucket is in a format compatible with AWS DMS. A common source format is CSV.
Create AWS Identity and Access Management (IAM) roles and policies that grant AWS DMS permissions to access Amazon S3.
Create AWS IAM roles and policies that grant AWS State Machine permissions to access AWS DMS.
Ensure that you have configured Amazon Simple Notification Service (Amazon SNS) topic for task status.

In the following sections, we provide a step-by-step guide for automating the AWS DMS task.

Create source and target AWS DMS endpoints

We use AWS DMS endpoints to define the source and target databases in a migration task. Endpoints contain the necessary information to connect to the databases, such as server names, credentials, ports, and database names. To create an endpoint for source and target, you can use the AWS DMS console or the AWS CLI.

When using Amazon S3 as a source, the source data files must be in CSV format; for example, /schemaName/<folder-name>/tableName/LOAD001.csv. For example, suppose that your data files are in the S3 path s3://<bucketName>/dmsfolder/dbo/ratings/ratings.csv. The following screenshot shows an example of the data in the ratings.csv file.

The following screenshot shows example settings of creating a target endpoint on the AWS DMS console. When using a PostgreSQL database as a target, provide the necessary details such as the PostgreSQL server endpoint, database credentials, and other connection settings.

Create an IAM role to invoke Step Functions

Create an IAM role with permission to invoke a Step Functions state machine from a Lambda function. For this post, the state machine is called DMS-S3-postgres-stepFunctions. The Lambda execution role should have permissions to interact with Step Functions, including the necessary start_execution action. See the following code:

{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: [
“states:StartExecution”
],
“Resource”: [
“arn:aws:states:us-east-1:**********:stateMachine:DMS-S3-postgres-stepFunctions”
]
}
]
}

Create the Step Functions state machine

To create a Step Functions workflow for creating an AWS DMS replication task, use the Step Functions console or the AWS CLI to create the Step Functions state machine DMS-S3-postgres-stepFunctions

Use the necessary input parameters for creating an AWS DMS task, such as source and target database endpoints, replication instance, and selection rules. These parameters are passed as input to the state machine workflow.

The following screenshot shows an example using the console, in which you use an empty template and enter the state machine code.

We use the following state machine code for this post:

{
“Comment”: “A description of my state machine”,
“StartAt”: “CreateReplicationTask”,
“States”: {
“CreateReplicationTask”: {
“Type”: “Task”,
“Next”: “Wait for Create”,
“Parameters”: {
“MigrationType”: “full-load”,
“ReplicationInstanceArn”: “arn:aws:dms:us-east-1:**********02:rep:BU547HLPVA7TGKWOJACENAUPKM”,
“ReplicationTaskIdentifier”: “Myblog”,
“SourceEndpointArn”: “arn:aws:dms:us-east-1:**********02:endpoint:RLQI6XJUJW56TSJVEWRQOK6R6BAGO45BIDEFG2Y”,
“TableMappings”: {
“rules”: [
{
“rule-type”: “selection”,
“rule-id”: “1”,
“rule-name”: “1”,
“object-locator”: {
“schema-name”: “dbo”,
“table-name”: “%”
},
“rule-action”: “include”
}
]
},
“TargetEndpointArn”: “arn:aws:dms:us-east-1:**********02:endpoint:UM2W53TDKBTP5U742KDUADE4LEULCCP5VC6ZVLY”
},
“Resource”: “arn:aws:states:::aws-sdk:databasemigration:createReplicationTask”
},
“Wait for Create”: {
“Type”: “Wait”,
“Seconds”: 70,
“Next”: “DescribeReplicationTask”
},
“DescribeReplicationTask”: {
“Type”: “Task”,
“Next”: “Get Task Status”,
“Parameters”: {
“Filters”: [
{
“Name”: “replication-task-arn”,
“Values.$”: “States.Array($.ReplicationTask.ReplicationTaskArn)”
}
],
“Marker”: “”,
“MaxRecords”: 20,
“WithoutSettings”: false
},
“Resource”: “arn:aws:states:::aws-sdk:databasemigration:describeReplicationTasks”,
“ResultSelector”: {
“Status.$”: “$.ReplicationTasks[0].Status”,
“ReplicationTaskArn.$”: “$.ReplicationTasks[0].ReplicationTaskArn”
}
},
“Get Task Status”: {
“Type”: “Choice”,
“Choices”: [
{
“Or”: [
{
“Variable”: “$.Status”,
“StringEquals”: “stopped”
},
{
“Variable”: “$.Status”,
“StringEquals”: “ready”
}
],
“Next”: “StartReplicationTask”
},
{
“Variable”: “$.Status”,
“StringEquals”: “failed”,
“Next”: “SNS Publish Failed”
}
],
“Default”: “StartReplicationTask”
},
“StartReplicationTask”: {
“Type”: “Task”,
“Parameters”: {
“ReplicationTaskArn.$”: “$.ReplicationTaskArn”,
“StartReplicationTaskType”: “reload-target”
},
“Resource”: “arn:aws:states:::aws-sdk:databasemigration:startReplicationTask”,
“Next”: “Wait for Start”,
“ResultSelector”: {
“Status.$”: “$.ReplicationTask.Status”,
“ReplicationTaskArn.$”: “$.ReplicationTask.ReplicationTaskArn”
}
},
“Wait for Start”: {
“Type”: “Wait”,
“Seconds”: 10,
“Next”: “Get Task load Status”
},
“Get Task load Status”: {
“Type”: “Choice”,
“Choices”: [
{
“Or”: [
{
“Variable”: “$.Status”,
“StringMatches”: “starting”
},
{
“Variable”: “$.Status”,
“StringMatches”: “running”
}
],
“Next”: “DescribeReloadStatus”
},
{
“Variable”: “$.Status”,
“StringMatches”: “failed”,
“Next”: “SNS Publish Failed”
},
{
“Or”: [
{
“Variable”: “$.Status”,
“StringMatches”: “stopped”
},
{
“Variable”: “$.Status”,
“StringMatches”: “ready”
}
],
“Next”: “SNS Publish”
}
],
“Default”: “SNS Publish”
},
“DescribeReloadStatus”: {
“Type”: “Task”,
“Next”: “Wait for Start”,
“Parameters”: {
“Filters”: [
{
“Name”: “replication-task-arn”,
“Values.$”: “States.Array($.ReplicationTaskArn)”
}
],
“Marker”: “”,
“MaxRecords”: 20,
“WithoutSettings”: false
},
“Resource”: “arn:aws:states:::aws-sdk:databasemigration:describeReplicationTasks”,
“ResultSelector”: {
“Status.$”: “$.ReplicationTasks[0].Status”,
“ReplicationTaskArn.$”: “$.ReplicationTasks[0].ReplicationTaskArn”
}
},
“SNS Publish”: {
“Type”: “Task”,
“Resource”: “arn:aws:states:::sns:publish”,
“Parameters”: {
“TopicArn”: “arn:aws:sns:us-east-1:**********02:CaseTestSNS”,
“Message.$”: “$”
},
“Next”: “DeleteReplicationTask”,
“ResultPath”: null
},
“SNS Publish Failed”: {
“Type”: “Task”,
“Resource”: “arn:aws:states:::sns:publish”,
“Parameters”: {
“TopicArn”: “arn:aws:sns:us-east-1:**********02:CaseTestSNS”,
“Message.$”: “$”
},
“ResultPath”: null,
“End”: true
},
“DeleteReplicationTask”: {
“Type”: “Task”,
“End”: true,
“Parameters”: {
“ReplicationTaskArn.$”: “$.ReplicationTaskArn”
},
“Resource”: “arn:aws:states:::aws-sdk:databasemigration:deleteReplicationTask”
}
},
“TimeoutSeconds”: 300
}

Create an IAM role for the Lambda function to access the S3 bucket

Create an IAM role that provides the necessary permissions for a Lambda function to access and interact with S3 buckets. This role is commonly used when creating Lambda functions that are triggered by S3 events or need to perform operations on S3 objects. For more information, refer to How do I allow my Lambda function access to my Amazon S3 bucket?

Create a Lambda function

To invoke Step Functions from a Lambda function, you can use the AWS SDK or the AWS SDK for Python (Boto3) to interact with the Step Functions API. Use the start_execution method of the Step Functions client to invoke the Step Functions state machine by providing the Amazon Resource Name (ARN) of the state machine and the input data.

On the Lambda console, create your Lambda function using the following code (provide the ARN of the state machine you created earlier):

import json
import urllib.parse
import boto3

print(‘Loading function’)

STEPFUNCTION_ARN = ‘arn:aws:states:us-east-1:************:stateMachine:DMS-S3-postgres-stepFunctions’
step_functions_client = boto3.client(‘stepfunctions’, ‘us-east-1’)

def lambda_handler(event, context):
step_functions_client.start_execution(stateMachineArn= STEPFUNCTION_ARN, input='{}’ )

Create a S3 event notification

With the S3 event notification, whenever an object is created or uploaded to the specified S3 bucket, the Lambda function is triggered with the S3 event information as the input, and it will process the event.

To enable Lambda notifications using the Amazon S3 console, complete the following steps:

On the Amazon S3 console, navigate to your bucket
Choose the Properties tab.
In the Event notification section, choose Create event notification.
For Prefix, enter dmsfolder/dbo/ratings/.
For Suffix, enter .csv.
In the Events type section, for Object creation, select Put.

If you upload a large file, use a multipart upload and select the s3:ObjectCreated:CompleteMultipartUpload event.

For Destination, select Lambda function.
For Specify Lambda function, you can choose from an existing function or create a new one. For this post, we select Enter Lambda function ARN and enter the ARN.
Choose Save changes.

For more information, refer to Enabling and configuring event notifications using the Amazon S3 console.

Upload the CSV file into Amazon S3

To upload a data file into Amazon S3, you can use the Amazon S3 console or the AWS CLI. For more information, refer to Uploading objects.

After you upload the file, you can observe that the AWS DMS full load task creation is in progress. It loads the data from the CSV file you uploaded from Amazon S3 to the Aurora PostgreSQL database instance. To verify the results, connect to target database and run a query on the ratings table.

Clean up

To clean up and prevent further costs, complete the following steps:

Drop the database tables in the target data stores.
Delete the AWS DMS replication instance.
Delete the S3 bucket and CSV files that you uploaded to the S3 bucket.
Delete the Lambda function.
Delete the Step Functions state machine.

Conclusion

When using Step Functions for creating AWS DMS tasks, you can achieve better control, visibility, flexibility, and scalability in managing your database migration process. It enables you to build robust and resilient migration workflows that can be customized to meet your specific requirements. With the complexity involved in database migrations, we highly recommend testing the migration steps in non-production environments prior to making changes in production.

If you have any questions or suggestions about this post, leave a comment.

About the authors

Sudhakar Darse is a Database Specialist Solutions Architect at AWS. He works with AWS customers to provide guidance and technical assistance on database services, helping them with database migrations to the AWS Cloud and improving the value of their solutions when using AWS.

Vijaya Battula is a Cloud Support Engineer working with AWS for 3+ years. His expertise in providing outstanding customer experience on the AWS Cloud, supporting external clients with tasks such as troubleshooting Serverless technologies like Step Functions, API Gateway, Lambda, and SWF, and improving public service documentation, internal documentation and articles.

Sarabjeet Singh is a Database Specialist Solutions Architect at Amazon Web Services. He works with our customers to provide guidance and technical assistance on database projects, helping them improve the value of their solutions when using AWS.

Read MoreAWS Database Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments