Thursday, May 30, 2024
No menu items!
HomeArtificial Intelligence and Machine LearningUnlock ML insights using the Amazon SageMaker Feature Store Feature Processor

Unlock ML insights using the Amazon SageMaker Feature Store Feature Processor

Amazon SageMaker Feature Store provides an end-to-end solution to automate feature engineering for machine learning (ML). For many ML use cases, raw data like log files, sensor readings, or transaction records need to be transformed into meaningful features that are optimized for model training.

Feature quality is critical to ensure a highly accurate ML model. Transforming raw data into features using aggregation, encoding, normalization, and other operations is often needed and can require significant effort. Engineers must manually write custom data preprocessing and aggregation logic in Python or Spark for each use case.

This undifferentiated heavy lifting is cumbersome, repetitive, and error-prone. The SageMaker Feature Store Feature Processor reduces this burden by automatically transforming raw data into aggregated features suitable for batch training ML models. It lets engineers provide simple data transformation functions, then handles running them at scale on Spark and managing the underlying infrastructure. This enables data scientists and data engineers to focus on the feature engineering logic rather than implementation details.

In this post, we demonstrate how a car sales company can use the Feature Processor to transform raw sales transaction data into features in three steps:

Local runs of data transformations.
Remote runs at scale using Spark.
Operationalization via pipelines.

We show how SageMaker Feature Store ingests the raw data, runs feature transformations remotely using Spark, and loads the resulting aggregated features into a feature group. These engineered features are can then be used to train ML models.

For this use case, we see how SageMaker Feature Store helps convert the raw car sales data into structured features. These features are subsequently used to gain insights like:

Average and maximum price of red convertibles from 2010
Models with best mileage vs. price
Sales trends of new vs. used cars over the years
Differences in average MSRP across locations

We also see how SageMaker Feature Store pipelines keep the features updated as new data comes in, enabling the company to continually gain insights over time.

Solution overview

We work with the dataset car_data.csv, which contains specifications such as model, year, status, mileage, price, and MSRP for used and new cars sold by the company. The following screenshot shows an example of the dataset.

The solution notebook feature_processor.ipynb contains the following main steps, which we explain in this post:

Create two feature groups: one called car-data for raw car sales records and another called car-data-aggregated for aggregated car sales records.
Use the @feature_processor decorator to load data into the car-data feature group from Amazon Simple Storage Service (Amazon S3).
Run the @feature_processor code remotely as a Spark application to aggregate the data.
Operationalize the feature processor via SageMaker pipelines and schedule runs.
Explore the feature processing pipelines and lineage in Amazon SageMaker Studio.
Use aggregated features to train an ML model.


To follow this tutorial, you need the following:

An AWS account.
SageMaker Studio set up.
AWS Identity and Access Management (IAM) permissions. When creating this IAM role, follow the best practice of granting least privileged access.

For this post, we refer to the following notebook, which demonstrates how to get started with Feature Processor using the SageMaker Python SDK.

Create feature groups

To create the feature groups, complete the following steps:

Create a feature group definition for car-data as follows:

# Feature Group – Car Sales CAR_SALES_FG_NAME = “car-data”
CAR_SALES_FG_ARN = f”arn:aws:sagemaker:{region}:{aws_account_id}:feature-group/{CAR_SALES_FG_NAME}”
CAR_SALES_FG_ROLE_ARN = offline_store_role
CAR_SALES_FG_OFFLINE_STORE_S3_URI = f”s3://{s3_bucket}/{s3_offline_store_prefix}”
    FeatureDefinition(feature_name=”id”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”model”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”model_year”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”status”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”mileage”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”price”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”msrp”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”ingest_time”, feature_type=FeatureTypeEnum.FRACTIONAL),

The features correspond to each column in the car_data.csv dataset (Model, Year, Status, Mileage, Price, and MSRP).

Add the record identifier id and event time ingest_time to the feature group:


Create a feature group definition for car-data-aggregated as follows:

# Feature Group – Aggregated Car SalesAGG_CAR_SALES_FG_NAME = “car-data-aggregated”
AGG_CAR_SALES_FG_ROLE_ARN = offline_store_role
AGG_CAR_SALES_FG_OFFLINE_STORE_S3_URI = f”s3://{s3_bucket}/{s3_offline_store_prefix}”
    FeatureDefinition(feature_name=”model_year_status”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”avg_mileage”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”max_mileage”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”avg_price”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”max_price”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”avg_msrp”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”max_msrp”, feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name=”ingest_time”, feature_type=FeatureTypeEnum.FRACTIONAL),

For the aggregated feature group, the features are model year status, average mileage, max mileage, average price, max price, average MSRP, max MSRP, and ingest time. We add the record identifier model_year_status and event time ingest_time to this feature group.

Now, create the car-data feature group:

# Create Feature Group – Car sale records.
car_sales_fg = FeatureGroup(

create_car_sales_fg_resp = car_sales_fg.create(

Create the car-data-aggregated feature group:

# Create Feature Group – Aggregated car sales records.
agg_car_sales_fg = FeatureGroup(

create_agg_car_sales_fg_resp = agg_car_sales_fg.create(

You can navigate to the SageMaker Feature Store option under Data on the SageMaker Studio Home menu to see the feature groups.

Use the @feature_processor decorator to load data

In this section, we locally transform the raw input data (car_data.csv) from Amazon S3 into the car-data feature group using the Feature Store Feature Processor. This initial local run allows us to develop and iterate before running remotely, and could be done on a sample of the data if desired for faster iteration.

With the @feature_processor decorator, your transformation function runs in a Spark runtime environment where the input arguments provided to your function and its return value are Spark DataFrames.

Install the Feature Processor SDK from the SageMaker Python SDK and its extras using the following command:

pip install sagemaker[feature-processor]

The number of input parameters in your transformation function must match the number of inputs configured in the @feature_processor decorator. In this case, the @feature_processor decorator has car-data.csv as input and the car-data feature group as output, indicating this is a batch operation with the target_store as OfflineStore:

from sagemaker.feature_store.feature_processor import (


Define the transform() function to transform the data. This function performs the following actions:

Convert column names to lowercase.
Add the event time to the ingest_time column.
Remove punctuation and replace missing values with NA.

def transform(raw_s3_data_as_df):
    “””Load data from S3, perform basic feature engineering, store it in a Feature Group”””
    from pyspark.sql.functions import regexp_replace
    from pyspark.sql.functions import lit
    import time

    transformed_df = (
        raw_s3_data_as_df.withColumn(“Price”, regexp_replace(“Price”, “$”, “”))
        # Rename Columns
        .withColumnRenamed(“Id”, “id”)
        .withColumnRenamed(“Model”, “model”)
        .withColumnRenamed(“Year”, “model_year”)
        .withColumnRenamed(“Status”, “status”)
        .withColumnRenamed(“Mileage”, “mileage”)
        .withColumnRenamed(“Price”, “price”)
        .withColumnRenamed(“MSRP”, “msrp”)
        # Add Event Time
        .withColumn(“ingest_time”, lit(int(time.time())))
        # Remove punctuation and fluff; replace with NA
        .withColumn(“mileage”, regexp_replace(“mileage”, “(,)|(mi.)”, “”))
        .withColumn(“mileage”, regexp_replace(“mileage”, “Not available”, “NA”))
        .withColumn(“price”, regexp_replace(“price”, “,”, “”))
        .withColumn(“msrp”, regexp_replace(“msrp”, “(^MSRPs\$)|(,)”, “”))
        .withColumn(“msrp”, regexp_replace(“msrp”, “Not specified”, “NA”))
        .withColumn(“msrp”, regexp_replace(“msrp”, “\$d+[a-zA-Zs]+”, “NA”))
        .withColumn(“model”, regexp_replace(“model”, “^dddds”, “”))

Call the transform() function to store the data in the car-data feature group:

# Execute the FeatureProcessor

The output shows that the data is ingested successfully into the car-data feature group.

The output of the function is as follows:

INFO:sagemaker:Ingesting transformed data to arn:aws:sagemaker:us-west-2:416578662734:feature-group/car-data with target_stores: [‘OfflineStore’]

| id|               model|model_year|status|mileage|   price| msrp|ingest_time|
|  0|    Acura TLX A-Spec|      2022|   New|     NA|49445.00|49445| 1686627154|
|  1|    Acura RDX A-Spec|      2023|   New|     NA|50895.00|   NA| 1686627154|
|  2|    Acura TLX Type S|      2023|   New|     NA|57745.00|   NA| 1686627154|
|  3|    Acura TLX Type S|      2023|   New|     NA|57545.00|   NA| 1686627154|
|  4|Acura MDX Sport H…|      2019|  Used| 32675 |40990.00|   NA| 1686627154|
|  5|    Acura TLX A-Spec|      2023|   New|     NA|50195.00|50195| 1686627154|
|  6|    Acura TLX A-Spec|      2023|   New|     NA|50195.00|50195| 1686627154|
|  7|    Acura TLX Type S|      2023|   New|     NA|57745.00|   NA| 1686627154|
|  8|    Acura TLX A-Spec|      2023|   New|     NA|47995.00|   NA| 1686627154|
|  9|    Acura TLX A-Spec|      2022|   New|     NA|49545.00|   NA| 1686627154|
| 10|Acura Integra w/A…|      2023|   New|     NA|36895.00|36895| 1686627154|
| 11|    Acura TLX A-Spec|      2023|   New|     NA|48395.00|48395| 1686627154|
| 12|Acura MDX Type S …|      2023|   New|     NA|75590.00|   NA| 1686627154|
| 13|Acura RDX A-Spec …|      2023|   New|     NA|55345.00|   NA| 1686627154|
| 14|    Acura TLX A-Spec|      2023|   New|     NA|50195.00|50195| 1686627154|
| 15|Acura RDX A-Spec …|      2023|   New|     NA|55045.00|   NA| 1686627154|
| 16|    Acura TLX Type S|      2023|   New|     NA|56445.00|   NA| 1686627154|
| 17|    Acura TLX A-Spec|      2023|   New|     NA|47495.00|47495| 1686627154|
| 18|   Acura TLX Advance|      2023|   New|     NA|52245.00|52245| 1686627154|
| 19|    Acura TLX A-Spec|      2023|   New|     NA|50595.00|50595| 1686627154|
only showing top 20 rows

We have successfully transformed the input data and ingested it in the car-data feature group.

Run the @feature_processor code remotely

In this section, we demonstrate running the feature processing code remotely as a Spark application using the @remote decorator described earlier. We run the feature processing remotely using Spark to scale to large datasets. Spark provides distributed processing on clusters to handle data that is too big for a single machine. The @remote decorator runs the local Python code as a single or multi-node SageMaker training job.

Use the @remote decorator along with the @feature_processor decorator as follows:

@remote(spark_config=SparkConfig(), instance_type = “ml.m5.xlarge”, …)
                   output=AGG_CAR_SALES_FG_ARN, target_stores=[“OfflineStore”], enable_ingestion=False )

The spark_config parameter indicates this is run as a Spark application. The SparkConfig instance configures the Spark configuration and dependencies.

Define the aggregate() function to aggregate the data using PySpark SQL and user-defined functions (UDFs). This function performs the following actions:

Concatenate model, year, and status to create model_year_status.
Take the average of price to create avg_price.
Take the max value of price to create max_price.
Take the average of mileage to create avg_mileage.
Take the max value of mileage to create max_mileage.
Take the average of msrp to create avg_msrp.
Take the max value of msrp to create max_msrp.
Group by model_year_status.

def aggregate(source_feature_group, spark):
    Aggregate the data using a SQL query and UDF.
    import time
    from pyspark.sql.types import StringType
    from pyspark.sql.functions import udf

    def custom_concat(*cols, delimeter: str = “”):
        return delimeter.join(cols)

    spark.udf.register(“custom_concat”, custom_concat)

    # Execute SQL string.
    aggregated_car_data = spark.sql(
            custom_concat(model, “_”, model_year, “_”, status) as model_year_status,
            AVG(price) as avg_price,
            MAX(price) as max_price,
            AVG(mileage) as avg_mileage,
            MAX(mileage) as max_mileage,
            AVG(msrp) as avg_msrp,
            MAX(msrp) as max_msrp,
            “{int(time.time())}” as ingest_time
        FROM car_data
        GROUP BY model_year_status

    return aggregated_car_data

Run the aggregate() function, which creates a SageMaker training job to run the Spark application:

# Execute the aggregate function

As a result, SageMaker creates a training job to the Spark application defined earlier. It will create a Spark runtime environment using the sagemaker-spark-processing image.

We use SageMaker Training jobs here to run our Spark feature processing application. With SageMaker Training, you can reduce startup times to 1 minute or less by using warm pooling, which is unavailable in SageMaker Processing. This makes SageMaker Training better optimized for short batch jobs like feature processing where startup time is important.

To view the details, on the SageMaker console, choose Training jobs under Training in the navigation pane, then choose the job with the name aggregate-<timestamp>.

The output of the aggregate() function generates telemetry code. Inside the output, you will see the aggregated data as follows:

|   model_year_status|         avg_price|max_price|       avg_mileage|max_mileage|avg_msrp|max_msrp|ingest_time|
|Acura CL 3.0_1997…|            7950.0|  7950.00|          100934.0|    100934 |    null|      NA| 1686634807|
|Acura CL 3.2 Type…|            6795.0|  7591.00|          118692.5|    135760 |    null|      NA| 1686634807|
|Acura CL 3_1998_Used|            9899.0|  9899.00|           63000.0|     63000 |    null|      NA| 1686634807|
|Acura ILX 2.0L Te…|         14014.125| 18995.00|         95534.875|     89103 |    null|      NA| 1686634807|
|Acura ILX 2.0L Te…|           15008.2| 16998.00|           94935.0|     88449 |    null|      NA| 1686634807|
|Acura ILX 2.0L Te…|           16394.6| 19985.00|           97719.4|     80000 |    null|      NA| 1686634807|
|Acura ILX 2.0L w/…|14567.181818181818| 16999.00| 96624.72727272728|     98919 |    null|      NA| 1686634807|
|Acura ILX 2.0L w/…|           16673.4| 18995.00|           84848.6|     96637 |    null|      NA| 1686634807|
|Acura ILX 2.0L w/…|12580.333333333334| 14546.00|100207.33333333333|     95782 |    null|      NA| 1686634807|
|Acura ILX 2.0L_20…|         14565.375| 17590.00|         92941.125|     81842 |    null|      NA| 1686634807|
|Acura ILX 2.0L_20…|           14877.9|  9995.00|           99739.5|     89252 |    null|      NA| 1686634807|
|Acura ILX 2.0L_20…|           15659.5| 15660.00|           82136.0|     89942 |    null|      NA| 1686634807|
|Acura ILX 2.0L_20…|17121.785714285714| 20990.00| 78278.14285714286|     96067 |    null|      NA| 1686634807|
|Acura ILX 2.4L (A…|           17846.0| 21995.00|          101558.0|     85974 |    null|      NA| 1686634807|
|Acura ILX 2.4L Pr…|           16327.0| 16995.00|           85238.0|     95356 |    null|      NA| 1686634807|
|Acura ILX 2.4L w/…|           12846.0| 12846.00|           75209.0|     75209 |    null|      NA| 1686634807|
|Acura ILX 2.4L_20…|           18998.0| 18998.00|           51002.0|     51002 |    null|      NA| 1686634807|
|Acura ILX 2.4L_20…|17908.615384615383| 19316.00| 74325.38461538461|     89116 |    null|      NA| 1686634807|
|Acura ILX 4DR SDN…|           18995.0| 18995.00|           37017.0|     37017 |    null|      NA| 1686634807|
|Acura ILX 8-SPD_2…|           24995.0| 24995.00|           22334.0|     22334 |    null|      NA| 1686634807|
only showing top 20 rows

When the training job is complete, you should see following output:

06-13 05:40 smspark-submit INFO     spark submit was successful. primary node exiting.
Training seconds: 153
Billable seconds: 153

Operationalize the feature processor via SageMaker pipelines

In this section, we demonstrate how to operationalize the feature processor by promoting it to a SageMaker pipeline and scheduling runs.

First, upload the file containing the feature processing logic to Amazon S3:

car_data_s3_uri = s3_path_join(“s3://”, sagemaker_session.default_bucket(),
                               ‘transformation_code’, ‘’)
S3Uploader.upload(local_path=’’, desired_s3_uri=car_data_s3_uri)

Next, create a Feature Processor pipeline car_data_pipeline using the .to_pipeline() function:

car_data_pipeline_name = f”{CAR_SALES_FG_NAME}-ingestion-pipeline”
car_data_pipeline_arn = fp.to_pipeline(pipeline_name=car_data_pipeline_name,
                                      transformation_code=TransformationCode(s3_uri=car_data_s3_uri) )
print(f”Created SageMaker Pipeline: {car_data_pipeline_arn}.”)

To run the pipeline, use the following code:

car_data_pipeline_execution_arn = fp.execute(pipeline_name=car_data_pipeline_name)
print(f”Started an execution with execution arn: {car_data_pipeline_execution_arn}”)

Similarly, you can create a pipeline for aggregated features called car_data_aggregated_pipeline and start a run.
Schedule the car_data_aggregated_pipeline to run every 24 hours:

           schedule_expression=”rate(24 hours)”, state=”ENABLED”)
print(f”Created a schedule.”)

In the output section, you will see the ARN of pipeline and the pipeline execution role, and the schedule details:

{‘pipeline_arn’: ‘arn:aws:sagemaker:us-west-2:416578662734:pipeline/car-data-aggregated-ingestion-pipeline’,
 ‘pipeline_execution_role_arn’: ‘arn:aws:iam::416578662734:role/service-role/AmazonSageMaker-ExecutionRole-20230612T120731’,
 ‘schedule_arn’: ‘arn:aws:scheduler:us-west-2:416578662734:schedule/default/car-data-aggregated-ingestion-pipeline’,
 ‘schedule_expression’: ‘rate(24 hours)’,
 ‘schedule_state’: ‘ENABLED’,
 ‘schedule_start_date’: ‘2023-06-13T06:05:17Z’,
 ‘schedule_role’: ‘arn:aws:iam::416578662734:role/service-role/AmazonSageMaker-ExecutionRole-20230612T120731’}

To get all the Feature Processor pipelines in this account, use the list_pipelines() function on the Feature Processor:


The output will be as follows:

[{‘pipeline_name’: ‘car-data-aggregated-ingestion-pipeline’},
 {‘pipeline_name’: ‘car-data-ingestion-pipeline’}]

We have successfully created SageMaker Feature Processor pipelines.

Explore feature processing pipelines and ML lineage

In SageMaker Studio, complete the following steps:

On the SageMaker Studio console, on the Home menu, choose Pipelines.

You should see two pipelines created: car-data-ingestion-pipeline and car-data-aggregated-ingestion-pipeline.

Choose the car-data-ingestion-pipeline.

It shows the run details on the Executions tab.

To view the feature group populated by the pipeline, choose Feature Store under Data and choose car-data.

You will see the two feature groups we created in the previous steps.

Choose the car-data feature group.

You will see the features details on the Features tab.

View pipeline runs

To view the pipeline runs, complete the following steps:

On the Pipeline Executions tab, select car-data-ingestion-pipeline.

This will show all the runs.

Choose one of the links to see the details of the run.

To view lineage, choose Lineage.

The full lineage for car-data shows the input data source car_data.csv and upstream entities. The lineage for car-data-aggregated shows the input car-data feature group.

Choose Load features and then choose Query upstream lineage on car-data and car-data-ingestion-pipeline to see all the upstream entities.

The full lineage for car-data feature group should look like the following screenshot.

Similarly, the lineage for the car-aggregated-data feature group should look like the following screenshot.

SageMaker Studio provides a single environment to track scheduled pipelines, view runs, explore lineage, and view the feature processing code.

The aggregated features such as average price, max price, average mileage, and more in the car-data-aggregated feature group provide insight into the nature of the data. You can also use these features as a dataset to train a model to predict car prices, or for other operations. However, training the model is out of scope for this post, which focuses on demonstrating the SageMaker Feature Store capabilities for feature engineering.

Clean up

Don’t forget to clean up the resources created as part of this post to avoid incurring ongoing charges.

Disable the scheduled pipeline via the fp.schedule() method with the state parameter as Disabled:

# Disable the scheduled pipeline
schedule_expression=”rate(24 hours)”,

Delete both feature groups:

# Delete feature groups

The data residing in the S3 bucket and offline feature store can incur costs, so you should delete them to avoid any charges.

Delete the S3 objects.
Delete the records from the feature store.


In this post, we demonstrated how a car sales company used SageMaker Feature Store Feature Processor to gain valuable insights from their raw sales data by:

Ingesting and transforming batch data at scale using Spark
Operationalizing feature engineering workflows via SageMaker pipelines
Providing lineage tracking and a single environment to monitor pipelines and explore features
Preparing aggregated features optimized for training ML models

By following these steps, the company was able to transform previously unusable data into structured features that could then be used to train a model to predict car prices. SageMaker Feature Store enabled them to focus on feature engineering rather than the underlying infrastructure.

We hope this post helps you unlock valuable ML insights from your own data using SageMaker Feature Store Feature Processor!

For more information on this, refer to Feature Processing and the SageMaker example on Amazon SageMaker Feature Store: Feature Processor Introduction.

About the Authors

Dhaval Shah
is a Senior Solutions Architect at AWS, specializing in Machine Learning. With a strong focus on digital native businesses, he empowers customers to leverage AWS and drive their business growth. As an ML enthusiast, Dhaval is driven by his passion for creating impactful solutions that bring positive change. In his leisure time, he indulges in his love for travel and cherishes quality moments with his family.

Ninad Joshi is a Senior Solutions Architect at AWS, helping global AWS customers design secure, scalable, and cost effective solutions in cloud to solve their complex real-world business challenges. His work in Machine Learning (ML) covers a wide range of AI/ML use cases, with a primary focus on End-to-End ML, Natural Language Processing, and Computer Vision. Prior to joining AWS, Ninad worked as a software developer for 12+ years. Outside of his professional endeavors, Ninad enjoys playing chess and exploring different gambits.

Read MoreAWS Machine Learning Blog



Please enter your comment!
Please enter your name here

Most Popular

Recent Comments