Thursday, April 25, 2024
No menu items!
HomeArtificial Intelligence and Machine LearningDefine and run Machine Learning pipelines on Step Functions using Python, Workflow...

Define and run Machine Learning pipelines on Step Functions using Python, Workflow Studio, or States Language

You can use various tools to define and run machine learning (ML) pipelines or DAGs (Directed Acyclic Graphs). Some popular options include AWS Step Functions, Apache Airflow, KubeFlow Pipelines (KFP), TensorFlow Extended (TFX), Argo, Luigi, and Amazon SageMaker Pipelines. All these tools help you compose pipelines in various languages (JSON, YAML, Python, and more), followed by viewing and running them using a workflow orchestrator. A deep comparison of each of these options is out of scope for this post, and involves appropriately selecting and benchmarking tools for your specific use case.

In this post, we discuss how to author end-to-end ML pipelines in Step Functions using three different methods:

Python – Using the Step Functions Data Science SDK
Drag and drop – Using the new Step Functions Workflow Studio
JSON – Using the Amazon States language

Solution overview

In this post, we create a simple workflow that involves a training step, creating a model, configuring an endpoint, and deploying the model

You can also create more complex workflows involving other steps such as Amazon SageMaker Processing, or automatic model tuning (HPO). You can also use Step Functions to integrate with other AWS services such as AWS Lambda, Amazon DynamoDB, AWS Glue, Amazon EMR, Amazon Athena, Amazon Elastic Kubernetes Service (Amazon EKS), and AWS Fargate. For more information on supported services, see Supported AWS Service Integrations for Step Functions. We provide guidance on other similar pipeline tools later in this post.

In this post, we use the MNIST dataset, which is a widely used dataset for handwritten digit classification. It consists of 70,000 labeled 28×28 pixel grayscale images of hand-written digits. The dataset is split into 60,000 training images and 10,000 test images. There are 10 classes (one for each of the 10 digits). The code used here closely follows a similar use case where the task is to classify each input image as one of the 10 digits (0–9).

The main training code uses a class from the standard PyTorch example for the model definition:

class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)

def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)

The main training function works for both single instance and distributed training, and does so by checking ARGs:

is_distributed = len(args.hosts) > 1 and args.backend is not None
if is_distributed:
# Initialize the distributed environment.
world_size = len(args.hosts)
os.environ[‘WORLD_SIZE’] = str(world_size)
host_rank = args.hosts.index(args.current_host)
os.environ[‘RANK’] = str(host_rank)
dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)
logger.info(‘Initialized the distributed environment: ‘{}’ backend on {} nodes. ‘.format(
args.backend, dist.get_world_size()) + ‘Current host rank is {}. Number of gpus: {}’.format(
dist.get_rank(), args.num_gpus))

The number of hosts is conveniently stored in an Amazon SageMaker environment variable, which can also be passed in as an argument:

parser.add_argument(‘–hosts’, type=list, default=json.loads(os.environ[‘SM_HOSTS’]))

Next, we load the datasets from the default data directory:

(os.environ[‘SM_MODEL_DIR’]):

train_loader = _get_train_data_loader(args.batch_size, args.data_dir, is_distributed, **kwargs)
test_loader = _get_test_data_loader(args.test_batch_size, args.data_dir, **kwargs)

We then enter the main training loop:

for epoch in range(1, args.epochs + 1):
model.train()

Finally, we save the model:

def save_model(model, model_dir):
logger.info(“Saving the model.”)
path = os.path.join(model_dir, ‘model.pth’)
# recommended way from http://pytorch.org/docs/master/notes/serialization.html
torch.save(model.cpu().state_dict(), path)

This code is stored in a file called mnist.py and used in later steps (see the full code on GitHub). The following are two important takeaways in connection to the pipeline:

You can use the data input location on Amazon Simple Storage Service (Amazon S3) as a parameter for the training step in a pipeline. This data is delivered to the training container, the local path of which is stored in an environment variable (for example, SAGEMAKER_CHANNEL_TRAINING).
The model is shown here as being saved locally in model_dir; the local path of the model directory (/opt/ml/model) is stored in an environment variable (SM_MODEL_DIR). At the end of the SageMaker training job, the model is copied to an Amazon S3 location so that model and endpoint related pipeline steps can access the model.

Now let’s look at our three methods to author end-to-end ML pipelines in Step Functions.

Use the Step Functions Data Science SDK

The Step Functions Data Science SDK is an open-source library that lets you create workflows entirely in Python. Installing this SDK is as simple as entering the following code:

pip install stepfunctions

The SDK allows you to do the following:

Create steps that accomplish tasks
Chain those steps together into workflows
Branch out to run steps in parallel or based on conditions
Include retry, succeed, or fail steps
Review a graphical representation and definition for your workflow
Create a workflow in Step Functions
Start and review runs in Step Functions

Although we don’t use many of these functions, the Step Functions Data Science SDK can include the following:

Standard states such as Pass, Fail, Succeed, and Wait
Choice rules
Compute steps such as using AWS Lambda, AWS Batch, AWS Glue, and Amazon Elastic Container Service (Amazon ECS)
SageMaker specific steps
Other service integrations such as DynamoDB, Amazon Simple Notification Service (Amazon SNS), Amazon Simple Queue Service (Amazon SNS), and Amazon EMR

To get started, we first create a PyTorch estimator with the mnist.py file. We configure the estimator with the training script, an AWS Identity and Access Management (IAM) role, the number of training instances, the training instance type, and hyperparameters:

from sagemaker.pytorch import PyTorch

estimator = PyTorch(entry_point=’mnist.py’,
role=sagemaker_execution_role,
framework_version=’1.2.0′,
py_version=’py3′,
train_instance_count=2,
train_instance_type=’ml.c4.xlarge’,
hyperparameters={
‘epochs’: 6
})

The Data Science SDK provides two ways of defining a pipeline. Firstly, you can use individual steps. For example, you can define a training step with the following code:

training_step = steps.TrainingStep(“Train Step”,estimator=estimator,…)

Then you create a model step:

model_step = steps.ModelStep(“Savemodel”, model=training_step.get_expected_model(),…):

Finally, you chain all the steps using the following code:

workflow_definition = steps.Chain([training_step, model_step, transform_step, endpoint_config_step, endpoint_step])

For more information, see Build a machine learning workflow using Step Functions and SageMaker.

Alternatively, you can use a standard training pipeline class that is built in to the SDK:

pipeline = TrainingPipeline(
estimator=estimator,
role=workflow_execution_role,
inputs=inputs,
s3_bucket=bucket
)

The workflow execution role allows you to create and run workflows in Step Functions. The following code creates the desired workflow and lets you render the same:

pipeline.render_graph()

Finally, you can create and run the workflow using pipeline.create() and pipeline.execute().

An example output from the execute() statement looks as follows, and provides you with a link to Step Functions where you can view and monitor your execution:

arn:aws:states:us-east-1:account-number:execution:training-pipeline-generated-date:training-pipeline-generated-time.

You can also render the current state of your workflow as it runs from a notebook using the following code:

execution.render_progress()

Use Step Functions Workflow Studio

To use Step Functions Workflow Studio, complete the following steps:

On the Step Functions console, choose Create state machine.
Select Design your workflow visually.
Choose Next.

Enter and filter your SageMaker steps, then drag and drop them to the training step.

In a similar fashion, drag and drop the following states in order:
Create Model
Create Endpoint Config
Create Endpoint

Your workflow should now look like the following diagram.

Now, let’s configure each of these steps.

Choose SageMaker CreateTrainingJob and edit the API parameters in the Form box.

Use the following JSON object if you are following this example:

{
“AlgorithmSpecification”: {
“TrainingImage”: “763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.2.0-cpu-py3”,
“TrainingInputMode”: “File”
},
“HyperParameters”: {
“epochs”: “6”,
“sagemaker_submit_directory.$”: “$$.Execution.Input.sourcedir”,
“sagemaker_program.$”: “$$.Execution.Input.trainfile”
},
“InputDataConfig”: [
{
“ChannelName”: “training”,
“DataSource”: {
“S3DataSource”: {
“S3DataType”: “S3Prefix”,
“S3Uri”: “s3://sagemaker-us-east-1-497456752804/sagemaker/DEMO-pytorch-mnist”,
“S3DataDistributionType”: “FullyReplicated”
}
}
}
],
“StoppingCondition”: {
“MaxRuntimeInSeconds”: 1000
},
“ResourceConfig”: {
“InstanceCount”: 2,
“InstanceType”: “ml.c4.xlarge”,
“VolumeSizeInGB”: 30
},
“OutputDataConfig”: {
“S3OutputPath”: “s3://sagemaker-us-east-1-497456752804/stepfunctions-workflow-training-job-v1/models”
},
“RoleArn”: “arn:aws:iam::497456752804:role/telecomstack-SagemakerExecutionRole-AHSGUPY5EQIK”,
“TrainingJobName.$”: “States.Format(‘trainingjob-{}’,$$.Execution.Name)”
}

Select Wait for task to complete.
Edit the API parameters for CreateModel:

{
“ExecutionRoleArn.$”: “$.RoleArn”,
“ModelName.$”: “States.Format(‘model-{}’,$$.Execution.Name)”,
“PrimaryContainer”: {
“Image.$”: “$.AlgorithmSpecification.TrainingImage”,
“Environment”: {
“SAGEMAKER_PROGRAM.$”: “$$.Execution.Input.trainfile”,
“SAGEMAKER_SUBMIT_DIRECTORY.$”: “$$.Execution.Input.sourcedir”
},
“ModelDataUrl.$”: “$.ModelArtifacts.S3ModelArtifacts”
}
}

Edit the API parameters for CreateEndpointConfig:

{
“EndpointConfigName.$”: “States.Format(‘config-{}’,$$.Execution.Name)”,
“ProductionVariants”: [
{
“InitialInstanceCount”: 1,
“InitialVariantWeight”: 1,
“InstanceType”: “ml.m4.xlarge”,
“ModelName.$”: “States.Format(‘model-{}’,$$.Execution.Name)”,
“VariantName”: “AllTraffic”
}
]
}

Edit the API parameters for CreateEndpoint:

{
“EndpointConfigName.$”: “States.Format(‘config-{}’,$$.Execution.Name)”,
“EndpointName.$”: “States.Format(‘endpoint-{}’,$$.Execution.Name)”
}

Choose Next, review the generated code, and choose Next.

Step Functions can look at the resources you use and create a role. However, you may see the following message:

“Step Functions cannot generate an IAM policy if the RoleArn for SageMaker is from a Path. Hardcode the SageMaker RoleArn in your state machine definition, or choose an existing role with the proper permissions for Step Functions to call SageMaker.”

We use a role that we created in the Data Science SDK section instead.

Select Use existing role and use the role StepFunctionsWorkflowExecutionRole.

Choose Create state machine.
When you receive the message that the machine was successfully created, run it with the following input:

{
“trainfile”:”mnist.py”,
“sourcedir”:”s3://path/to/sourcedir.tar.gz”
}

Monitor and wait for the run to finish.

Use the Amazon States Language

Both the methods we just discussed are great ways to quickly prototype a state machine on Step Functions. When you need to edit the Step Functions definition directly, you can use the States language. See the following code:

{
“Comment”: “This is your state machine”,
“StartAt”: “SageMaker CreateTrainingJob”,
“States”: {
“SageMaker CreateTrainingJob”: {
“Type”: “Task”,
“Resource”: “arn:aws:states:::sagemaker:createTrainingJob.sync”,
“Parameters”: {
“AlgorithmSpecification”: {
“TrainingImage”: “763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.2.0-cpu-py3”,
“TrainingInputMode”: “File”
},
“HyperParameters”: {
“epochs”: “6”,
“sagemaker_submit_directory.$”: “$$.Execution.Input.sourcedir”,
“sagemaker_program.$”: “$$.Execution.Input.trainfile”
},
“InputDataConfig”: [
{
“ChannelName”: “training”,
“DataSource”: {
“S3DataSource”: {
“S3DataType”: “S3Prefix”,
“S3Uri”: “s3://sagemaker-us-east-1-497456752804/sagemaker/DEMO-pytorch-mnist”,
“S3DataDistributionType”: “FullyReplicated”
}
}
}
],
“StoppingCondition”: {
“MaxRuntimeInSeconds”: 1000
},
“ResourceConfig”: {
“InstanceCount”: 2,
“InstanceType”: “ml.c4.xlarge”,
“VolumeSizeInGB”: 30
},
“OutputDataConfig”: {
“S3OutputPath”: “s3://sagemaker-us-east-1-497456752804/stepfunctions-workflow-training-job-v1/models”
},
“RoleArn”: “arn:aws:iam::497456752804:role/telecomstack-SagemakerExecutionRole-AHSGUPY5EQIK”,
“TrainingJobName.$”: “States.Format(‘trainingjob-{}’,$$.Execution.Name)”
},
“Next”: “SageMaker CreateModel”
},
“SageMaker CreateModel”: {
“Type”: “Task”,
“Resource”: “arn:aws:states:::sagemaker:createModel”,
“Parameters”: {
“ExecutionRoleArn.$”: “$.RoleArn”,
“ModelName.$”: “States.Format(‘model-{}’,$$.Execution.Name)”,
“PrimaryContainer”: {
“Image.$”: “$.AlgorithmSpecification.TrainingImage”,
“Environment”: {
“SAGEMAKER_PROGRAM.$”: “$$.Execution.Input.trainfile”,
“SAGEMAKER_SUBMIT_DIRECTORY.$”: “$$.Execution.Input.sourcedir”
},
“ModelDataUrl.$”: “$.ModelArtifacts.S3ModelArtifacts”
}
},
“Next”: “SageMaker CreateEndpointConfig”
},
“SageMaker CreateEndpointConfig”: {
“Type”: “Task”,
“Resource”: “arn:aws:states:::sagemaker:createEndpointConfig”,
“Parameters”: {
“EndpointConfigName.$”: “States.Format(‘config-{}’,$$.Execution.Name)”,
“ProductionVariants”: [
{
“InitialInstanceCount”: 1,
“InitialVariantWeight”: 1,
“InstanceType”: “ml.m4.xlarge”,
“ModelName.$”: “States.Format(‘model-{}’,$$.Execution.Name)”,
“VariantName”: “AllTraffic”
}
]
},
“Next”: “SageMaker CreateEndpoint”
},
“SageMaker CreateEndpoint”: {
“Type”: “Task”,
“Resource”: “arn:aws:states:::sagemaker:createEndpoint”,
“Parameters”: {
“EndpointConfigName.$”: “States.Format(‘config-{}’,$$.Execution.Name)”,
“EndpointName.$”: “States.Format(‘endpoint-{}’,$$.Execution.Name)”
},
“End”: true
}
}
}

You can create a new Step Functions state machine on the Step Functions console by selecting Write your workflow in code.

A successful run shows each state in green.

Each state points to resources in SageMaker. In the following screenshot, the link under Resource points to the model created as a result of the CreateModel step.

When to use what?

The following table summarizes the supported service integrations.

Supported Service
Amazon States Language
(New) Step Functions Workflow Studio
AWS Step Functions Data Science SDK
AWS Lambda



AWS Batch



Amazon DynamoDB



Amazon ECS/AWS Fargate



Amazon SNS



Amazon SQS



AWS Glue



Amazon SageMaker



Amazon EMR



Amazon EMR on EKS


AWS CodeBuild


Amazon Athena


Amazon EKS


Amazon API Gateway


AWS Glue DataBrew


Amazon EventBridge


AWS Step Functions

Although most of what you typically need for your pipelines is included in the Step Functions Data Science SDK, you may need to integrate with other supported services that are supported by other choices shown in the preceding table.

In addition, consider the skillsets in your existing team—teams that are used to working with a particular tool may prefer sticking to the same for maximizing productivity. This is true when considering the options within Step Functions explored here, but also others such as the AWS Cloud Development Kit (AWS CDK), AWS Serverless Application Model (AWS SAM), Airflow, KubeFlow, and SageMaker Pipelines. Specifically around Pipelines, consider that data scientists and ML engineers may benefit from working on a single platform that includes the ability to not only maintain and run pipelines, but also manage models, endpoints, notebooks and other features.

Lastly, consider a hybrid set of services for using the right tool for the right job. For example:

You can use AWS CodePipeline along with Step Functions for orchestrating ML pipelines that require custom containers. CodePipeline invokes Step Functions and passes the container image URI and the unique container image tag as parameters to Step Functions. For more information, see Build a CI/CD pipeline for deploying custom machine learning models using AWS services.
You can use Kubeflow pipelines to define the training pipeline, and SageMaker to host trained models on the cloud. For more information, see Cisco uses Amazon SageMaker and Kubeflow to create a hybrid machine learning workflow.
You can use Pipelines for automating feature engineering pipelines using SageMaker Data Wrangler and SageMaker Feature Store. Pipelines is a purpose-built CI/CD tool for ML model building and deployment that not only includes workflow orchestration, but is also related to concepts such as model registry, lineage tracking, and projects. When upstream processes are already using Step Functions, for example, to prepare data using AWS services, consider using a hybrid architecture where both Step Functions and Pipelines are used. For more information, see Automate feature engineering pipelines with Amazon SageMaker.
When developers and data scientists need to write infrastructure as code with unit testing, consider using tools like the AWS Data Science SDK and Apache Airflow, because you can use Python to define end-to-end architectures and pipelines.

Summary

In this post, we looked at three different ways of authoring and running Step Functions pipelines, specifically for end-to-end ML workflows. Choosing a pipelining tool for ML is an important step for a team, and is a decision that needs to consider the context, existing skillsets, connections to various other teams with these skillsets in an organization, available service integration, service limits, and applicable quotas. Contact your AWS team to help guide you through these decisions; we are eager to help!

For further reading, check out the following:

New – AWS Step Functions Workflow Studio – A Low-Code Visual Tool for Building State Machines
The step-functions-data-science-sdk GitHub repo
Prototyping at speed with AWS Step Functions new Workflow Studio
Introducing Amazon Managed Workflows for Apache Airflow (MWAA) 
Introducing Amazon SageMaker Components for Kubeflow Pipelines

About the Author

Shreyas Subramanian is a AI/ML specialist Solutions Architect, and helps customers by using Machine Learning to solve their business challenges on the AWS Cloud.

Read MoreAWS Machine Learning Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments