The ability to predict that a particular customer is at a high risk of churning, while there is still time to do something about it, represents a huge potential revenue source for every online business. Depending on the industry and business objective, the problem statement can be multi-layered. The following are some business objectives based on this strategy:
Develop a framework to build propensity models estimating the probability that a given customer will remain a paid customer over several time windows like 15D, 30D, and 45D rolling window
Develop a framework for better targeting win-back campaigns
Identify features that are the biggest differentiators amongst customers
This post discusses how you can orchestrate an end-to-end churn prediction model across each step: data preparation, experimenting with a baseline model and hyperparameter optimization (HPO), training and tuning, and registering the best model. You can manage your Amazon SageMaker training and inference workflows using Amazon SageMaker Studio and the SageMaker Python SDK. SageMaker offers all the tools you need to create high-quality data science solutions.
SageMaker helps data scientists and developers prepare, build, train, and deploy high-quality machine learning (ML) models quickly by bringing together a broad set of capabilities purpose-built for ML.
Studio provides a single, web-based visual interface where you can perform all ML development steps, improving data science team productivity by up to 10 times.
By the end of this post, you should have enough information to successfully use this end-to-end template using Pipelines to train, tune, and deploy your own predictive analytics use case. The full instructions are available on the GitHub repo.
In this solution, your entry point is the Studio integrated development environment (IDE) for rapid experimentation. Studio offers an environment to manage the end-to-end Pipelines experience. With Studio, you can bypass the AWS Management Console for your entire workflow management. For more information on managing Pipelines from Studio, see View, Track, and Execute SageMaker Pipelines in SageMaker Studio.
The following diagram illustrates the high-level architecture of the data science workflow.
Customer churn model development using Studio notebooks.
Preprocess the data to build the features required and split data in train, validation, and test datasets.
Apply hyperparameter tuning based on the ranges provided with the SageMaker XGBoost framework to give the best model, which is determined based on AUC score.
Evaluate the best model using the test dataset.
Check if the AUC score is above a certain threshold. If so, proceed to the next steps.
Register the trained churn model in the SageMaker Model Registry.
Create a SageMaker model by taking the artifacts of the best model.
Apply batch transform on the given dataset by using the model created in the previous step.
Create the config file, which includes information as to which columns to check bias on, baseline values for generating SHAPley plots, and more.
Apply Clarify using the config file created in the previous step to generate model explainability and bias information reports.
Pipelines is integrated directly with SageMaker, so you don’t need to interact with any other AWS services. You also don’t need to manage any resources because Pipelines is a fully managed service, which means that it creates and manages resources for you. For more information the various SageMaker components that are both standalone Python APIs along with integrated components of Studio, see the SageMaker service page.
For this use case, you use the following components for the fully automated model development process:
A SageMaker pipeline is a series of interconnected steps that is defined by a JSON pipeline definition. This pipeline definition encodes a pipeline using a directed acyclic graph (DAG). This DAG gives information on the requirements for and relationships between each step of your pipeline. The structure of a pipeline’s DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step’s output are passed as the input to another step.
For this post, our use case is a classic ML problem that aims to understand what various marketing strategies based on consumer behavior we can adopt to increase customer retention for a given retail store. The following diagram illustrates the complete ML workflow for the churn prediction use case.
Let’s go through the accelerated ML workflow development process in detail.
The following screenshot shows the sample set with the target variable as retained 1, if customer is assumed to be active, or 0 otherwise.
Run the following code in a Studio notebook to preprocess the dataset and upload it to your own S3 bucket:
With Studio notebooks with elastic compute, you can now easily run multiple training and tuning jobs. For this use case, you use the SageMaker built-in XGBoost algorithm and SageMaker HPO with objective function as “binary:logistic” and “eval_metric”:”auc”.
def split_datasets(df):
y=df.pop(“retained”)
X_pre = df
y_pre = y.to_numpy().reshape(len(y),1)
feature_names = list(X_pre.columns)
X= np.concatenate((y_pre,X_pre),axis=1)
np.random.shuffle(X)
train,validation,test=np.split(X,[int(.7*len(X)),int(.85*len(X))])
return feature_names,train,validation,test
# Split dataset
feature_names,train,validation,test = split_datasets(storedata)
# Save datasets in Amazon S3
pd.DataFrame(train).to_csv(f”s3://{default_bucket}/data/train/train.csv”,header=False,index=False)
pd.DataFrame(validation).to_csv(f”s3://{default_bucket}/data/validation/validation.csv”,header=False,index=False)
pd.DataFrame(test).to_csv(f”s3://{default_bucket}/data/test/test.csv”,header=False,index=False)
Train, tune, and find the best candidate model with the following code:
# Training and Validation Input for SageMaker Training job
s3_input_train = TrainingInput(
s3_data=f”s3://{default_bucket}/data/train/”,content_type=”csv”)
s3_input_validation = TrainingInput(
s3_data=f”s3://{default_bucket}/data/validation/”,content_type=”csv”)
# Hyperparameter used
fixed_hyperparameters = {
“eval_metric”:”auc”,
“objective”:”binary:logistic”,
“num_round”:”100″,
“rate_drop”:”0.3″,
“tweedie_variance_power”:”1.4″
}
# Use the built-in SageMaker algorithm
sess = sagemaker.Session()
container = sagemaker.image_uris.retrieve(“xgboost”,region,”0.90-2″)
estimator = sagemaker.estimator.Estimator(
container,
role,
instance_count=1,
hyperparameters=fixed_hyperparameters,
instance_type=”ml.m4.xlarge”,
output_path=”s3://{}/output”.format(default_bucket),
sagemaker_session=sagemaker_session
)
hyperparameter_ranges = {
“eta”: ContinuousParameter(0, 1),
“min_child_weight”: ContinuousParameter(1, 10),
“alpha”: ContinuousParameter(0, 2),
“max_depth”: IntegerParameter(1, 10),
}
objective_metric_name = “validation:auc”
tuner = HyperparameterTuner(
estimator, objective_metric_name,
hyperparameter_ranges,max_jobs=10,max_parallel_jobs=2)
# Tune
tuner.fit({
“train”:s3_input_train,
“validation”:s3_input_validation
},include_cls_metadata=False)
## Explore the best model generated
tuning_job_result = boto3.client(“sagemaker”).describe_hyper_parameter_tuning_job(
HyperParameterTuningJobName=tuner.latest_tuning_job.job_name
)
job_count = tuning_job_result[“TrainingJobStatusCounters”][“Completed”]
print(“%d training jobs have completed” %job_count)
## 10 training jobs have completed
## Get the best training job
from pprint import pprint
if tuning_job_result.get(“BestTrainingJob”,None):
print(“Best Model found so far:”)
pprint(tuning_job_result[“BestTrainingJob”])
else:
print(“No training jobs have reported results yet.”)
After you establish a baseline, you can use Amazon SageMaker Debugger for offline model analysis. Debugger is a capability within SageMaker that automatically provides visibility into the model training process for real-time and offline analysis. Debugger saves the internal model state at periodic intervals, which you can analyze in real time during training and offline after the training is complete. For this use case, you use the explainability tool SHAP (SHapley Additive exPlanation) and the native integration of SHAP with Debugger. Refer to the following notebook for detailed analysis.
The following summary plot explains the positive and negative relationships of the predictors with the target variable. For example, the top variable here, esent, is defined as number of emails sent. This plot is made of all data points in the training set. Blue indicates dragging the final output to class 0, and pink represents class 1. Key influencing features are ranked in descending order.
Now you can proceed with the deploy and manage step of the ML workflow.
Develop and automate the workflow
Let’s start with the project structure:
/customer-churn-model – Project name
/data – Dataset
/pipelines – Code for SageMaker pipeline components
SageMaker_Pipelines_project.ipynb – Allows you to create and run the ML workflow
Customer_Churn_Modeling.ipynb – Baseline model development notebook
Under <project-name>/pipelines/customerchurn, you can see the following Python scripts:
Preprocess.py – Integrates with SageMaker Processing for feature engineering
Evaluate.py – Allows model metrics calculation, in this case auc_score
Generate_config.py – Allows dynamic configuration needed for the downstream Clarify job for model explainability
Pipeline.py – Templatized code for the Pipelines ML workflow
Let’s walk through every step in the DAG and how they run. The steps are similar to when we first prepared the data.
Perform data readiness with the following code:
# processing step for feature engineering
sklearn_processor = SKLearnProcessor(
framework_version=”0.23-1″,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
sagemaker_session=sagemaker_session,
role=role,
)
step_process = ProcessingStep(
name=”ChurnModelProcess”,
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination=”/opt/ml/processing/input”),
],
outputs=[
ProcessingOutput(output_name=”train”, source=”/opt/ml/processing/train”,
destination=f”s3://{default_bucket}/output/train” ),
ProcessingOutput(output_name=”validation”, source=”/opt/ml/processing/validation”,
destination=f”s3://{default_bucket}/output/validation”),
ProcessingOutput(output_name=”test”, source=”/opt/ml/processing/test”,
destination=f”s3://{default_bucket}/output/test”)
],
code=f”s3://{default_bucket}/input/code/preprocess.py”,
)
Train, tune, and find the best candidate model:
# training step for generating model artifacts
model_path = f”s3://{default_bucket}/output”
image_uri = sagemaker.image_uris.retrieve(
framework=”xgboost”,
region=region,
version=”1.0-1″,
py_version=”py3″,
instance_type=training_instance_type,
)
fixed_hyperparameters = {
“eval_metric”:”auc”,
“objective”:”binary:logistic”,
“num_round”:”100″,
“rate_drop”:”0.3″,
“tweedie_variance_power”:”1.4″
}
xgb_train = Estimator(
image_uri=image_uri,
instance_type=training_instance_type,
instance_count=1,
hyperparameters=fixed_hyperparameters,
output_path=model_path,
base_job_name=f”churn-train”,
sagemaker_session=sagemaker_session,
role=role,
)
hyperparameter_ranges = {
“eta”: ContinuousParameter(0, 1),
“min_child_weight”: ContinuousParameter(1, 10),
“alpha”: ContinuousParameter(0, 2),
“max_depth”: IntegerParameter(1, 10),
}
objective_metric_name = “validation:auc”
You can add a model tuning step (TuningStep) in the pipeline, which automatically invokes a hyperparameter tuning job (see the following code). The hyperparameter tuning finds the best version of a model by running many training jobs on the dataset using the algorithm and the ranges of hyperparameters that you specified. You can then register the best version of the model into the model registry using the RegisterModel step.
## Direct Integration for HPO
step_tuning = TuningStep(
name = “ChurnHyperParameterTuning”,
tuner = HyperparameterTuner(xgb_train, objective_metric_name, hyperparameter_ranges, max_jobs=2, max_parallel_jobs=2),
inputs={
“train”: TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
“train”
].S3Output.S3Uri,
content_type=”text/csv”,
),
“validation”: TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
“validation”
].S3Output.S3Uri,
content_type=”text/csv”,
),
},
)
After you tune the model, depending on the tuning job objective metrics, you can use branching logic when orchestrating the workflow. For this post, the conditional step for model quality check is as follows:
# condition step for evaluating model quality and branching execution
cond_lte = ConditionGreaterThan(
left=JsonGet(
step=step_eval,
property_file=evaluation_report,
json_path=”classification_metrics.auc_score.value”
),
right=0.75,
)
The best candidate model is registered for batch scoring using the RegisterModel step:
step_register = RegisterModel(
name=”RegisterChurnModel”,
estimator=xgb_train,
model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix=”output”),
content_types=[“text/csv”],
response_types=[“text/csv”],
inference_instances=[“ml.t2.medium”, “ml.m5.large”],
transform_instances=[“ml.m5.large”],
model_package_group_name=model_package_group_name,
model_metrics=model_metrics,
)
Now that the model is trained, let’s see how Clarify helps us understand what features the models base their predictions on. You can create an analysis_config.json file dynamically per workflow run using the generate_config.py utility. You can version and track the config file per pipeline runId and store it in Amazon S3 for further references. Initialize the dataconfig and modelconfig files as follows:
data_config = sagemaker.clarify.DataConfig(
s3_data_input_path=f’s3://{args.default_bucket}/output/train/train.csv’,
s3_output_path=args.bias_report_output_path,
label=0,
headers= [‘target’,’esent’,’eopenrate’,’eclickrate’,’avgorder’,’ordfreq’,’paperless’,’refill’,’doorstep’,’first_last_days_diff’,’created_first_days_diff’,’favday_Friday’,’favday_Monday’,’favday_Saturday’,’favday_Sunday’,’favday_Thursday’,’favday_Tuesday’,’favday_Wednesday’,’city_BLR’,’city_BOM’,’city_DEL’,’city_MAA’],
dataset_type=”text/csv”,
)
model_config = sagemaker.clarify.ModelConfig(
model_name=args.modelname,
instance_type=args.clarify_instance_type,
instance_count=1,
accept_type=”text/csv”,
)
model_predicted_label_config = sagemaker.clarify.ModelPredictedLabelConfig(probability_threshold=0.5)
bias_config = sagemaker.clarify.BiasConfig(
label_values_or_threshold=[1],
facet_name=”doorstep”,
facet_values_or_threshold=[0],
)
After you add the Clarify step as a postprocessing job using sagemaker.clarify.SageMakerClarifyProcessor in the pipeline, you can see a detailed feature and bias analysis report per pipeline run.
As the final step of the pipeline workflow, you can use the TransformStep step for offline scoring. Pass in the transformer instance and the TransformInput with the batch_data pipeline parameter defined earlier:
# step to perform batch transformation
transformer = Transformer(
model_name=step_create_model.properties.ModelName,
instance_type=”ml.m5.xlarge”,
instance_count=1,
output_path=f”s3://{default_bucket}/ChurnTransform”
)
step_transform = TransformStep(
name=”ChurnTransform”,
transformer=transformer,
inputs=TransformInput(data=batch_data,content_type=”text/csv”)
)
Finally, you can trigger a new pipeline run by choosing Start an execution on the Studio IDE interface.
You can also describe a pipeline run or start the pipeline using the following notebook. The following screenshot shows our output.
You can schedule your SageMaker model building pipeline runs using Amazon EventBridge. SageMaker model building pipelines are supported as a target in Amazon EventBridge. This allows you to trigger your pipeline to run based on any event in your event bus. EventBridge enables you to automate your pipeline runs and respond automatically to events such as training job or endpoint status changes. Events include a new file being uploaded to your S3 bucket, a change in status of your SageMaker endpoint due to drift, and Amazon Simple Notification Service (Amazon SNS) topics.
Conclusion
This post explained how to use SageMaker Pipelines with other built-in SageMaker features and the XGBoost algorithm to develop, iterate, and deploy the best candidate model for church prediction. For instructions on implementing this solution, see the GitHub repo. You can also clone and extend this solution with additional data sources for model retraining. We encourage you to reach out and discuss your ML use cases with your AWS account manager.
Additional references
For additional information, see the following resources:
Predicting Customer Churn with Amazon Machine Learning
Building, automating, managing, and scaling ML workflows using Amazon SageMaker Pipelines
ML Explainability with Amazon SageMaker Debugger
About the Authors
Gayatri Ghanakota is a Machine Learning Engineer with AWS Professional Services. She is passionate about developing, deploying, and explaining AI/ ML solutions across various domains. Prior to this role, she led multiple initiatives as a data scientist and ML engineer with top global firms in the financial and retail space. She holds a master’s degree in Computer Science specialized in Data Science from the University of Colorado, Boulder.
Sarita Joshi is a Senior Data Scientist with AWS Professional Services focused on supporting customers across industries including retail, insurance, manufacturing, travel, life sciences, media and entertainment, and financial services. She has several years of experience as a consultant advising clients across many industries and technical domains, including AI, ML, analytics, and SAP. Today, she is passionately working with customers to develop and implement machine learning and AI solutions at scale.