Sunday, October 13, 2024
No menu items!
HomeCloud ComputingAnnouncing Serverless Spark components for Vertex AI Pipelines

Announcing Serverless Spark components for Vertex AI Pipelines

Developers and ML engineers face a variety of challenges when it comes to operationalizing Spark ML workloads. One set of challenges may come in the form of infrastructure concerns, for example, how to provision infrastructure clusters in advance, how to ensure that there are enough resources to run different kinds of tasks like data preparation, model training, and model evaluation in a reasonable time. Another set of challenges could come from task orchestration and data handling, for example, how to ensure that the most up-to-date features are available when a model training task is run.

In order to solve those challenges, you can use Vertex AI Pipelines to automate ML workflows in conjunction with Dataproc for running serverless Spark workloads. For example, this article shows you how to train and deploy a Spark ML model to achieve near real-time predictions, without provisioning any infrastructure in advance. In particular, It proves how to launch and orchestrate Dataproc jobs from Vertex AI Pipelines by using custom Python components.

Today we are excited to announce the official release of new Dataproc Serverless components for Vertex AI Pipelines that further simplify MLOps for Spark, Spark SQL, PySpark and Spark jobs. 

The following components are now available:

DataprocPySparkBatchOp: PySpark batch workloadsDataprocSparkBatchOp: Spark batch workloadsDataprocSparkSqlBatchOp: Spark SQL batch workloadsDataprocSparkRBatchOp: SparkR batch workloads

With those components, you have native KFP operators to easily orchestrate Spark-based ML pipelines with Vertex AI Pipelines and Dataproc Serverless. You just need to build their preprocessing, training and postprocessing steps and compile the KFP pipeline. Then, thanks to Vertex AI Pipeline and Dataproc Serverless, you will run ML workflow in a reliable, scalable and reproducible way without requiring to provision and manage the infrastructure.

To learn more about how to use these components, you can find a getting started tutorial below

https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb 

In addition to that, below you have an end-to-end example of using PySpark and DataprocPySparkBatchOp component. 

Training Loan Eligibility’s Model using Pyspark and Dataproc serverless in a Vertex AI Pipeline

In this section, we will show you how to build a Spark ML pipeline using Spark MLlib and DataprocPySparkBatchOp component to determine the customer eligibility for a loan from a banking company. In particular, the pipeline covers a Spark MLlib pipeline, from data preprocessing to hyperparameter tuning of a random forest classifier which predicts the probability of a customer being eligible for a loan. 

Below you have the pipeline view:

The pipeline workflow (Click to enlarge)

As shown in the diagram, the pipeline:

Stores data in a Cloud storage

Imputes categorical and numerical variables with DataprocPySparkBatchOp

Trains an RandomForestClassifier with DataprocPySparkBatchOp

Runs a custom component to evaluate and  represents model metrics in Vertex AI Pipelines UI

If the model respects the performance condition (area under the precision-recall curve, auPR for short, is higher that a minimum value of 0.5), then

Hypertune the RandomForestClassifier with DataprocPySparkBatchOp

Register the model in the Vertex AI Model Registry

To simplify, let’s consider the training step in order to show how DataprocPySparkBatchOp works. 

Train PySpark MLlib model using DataprocPySparkBatchOp component

In the example, we train a Random Forest Classifier using PySpark and Spark MLlib as a pipeline step. To use the DataprocPySparkBatchOp component to execute the training in Dataproc Serverless, you first need to create the training script. 

Below you have the one you will find in the GitHub repo:

code_block[StructValue([(u’code’, u’def main(logger, args):rn “””rn Main functionrn Args:rn logger: loggerrn args: argsrn Returns:rn Nonern “””rn train_path = args.train_pathrn model_path = args.model_pathrn metrics_path = args.metrics_pathrnrn try:rn logger.info(‘initializing pipeline training.’)rn logger.info(‘start spark session.’)rn spark = (SparkSession.builderrn .master(“local[*]”)rn .appName(“spark go live”)rn .config(‘spark.ui.port’, ‘4050’)rn .getOrCreate())rn logger.info(f’spark version: {spark.sparkContext.version}’)rn logger.info(‘start building pipeline.’)rn preprocessing_stages = build_preprocessing_components()rn feature_engineering_stages = build_feature_engineering_components()rn model_training_stage = build_training_model_component()rn pipeline = build_pipeline(preprocessing_stages, feature_engineering_stages, model_training_stage)rnrn logger.info(f’load train data from {train_path}.’)rn raw_data = (spark.read.format(‘csv’)rn .option(“header”, “true”)rn .schema(DATA_SCHEMA)rn .load(train_path))rnrn logger.info(‘fit model pipeline.’)rn train, test = raw_data.randomSplit(RANDOM_QUOTAS, seed=RANDOM_SEED)rn pipeline_model = pipeline.fit(train)rn predictions = pipeline_model.transform(test)rn metrics = get_metrics(predictions, TARGET, ‘test’)rn for m, v in metrics.items():rn print(f'{m}: {v}’)rnrn logger.info(f’load model pipeline in {model_path}.’)rn pipeline.write().overwrite().save(model_path)rn if model_path.startswith(‘gs://’):rn pipeline.write().overwrite().save(model_path)rn else:rn path(model_path).mkdir(parents=True, exist_ok=True)rn pipeline.write().overwrite().save(model_path)rnrn logger.info(f’upload metrics under {metrics_path}.’)rn if metrics_path.startswith(‘gs://’):rn bucket = urlparse(model_path).netlocrn metrics_file_path = urlparse(metrics_path).path.strip(‘/’)rn write_metrics(bucket, metrics, metrics_file_path)rn else:rn metrics_version_path = path(metrics_path).parents[0]rn metrics_version_path.mkdir(parents=True, exist_ok=True)rn with open(metrics_path, ‘w’) as json_file:rn json.dump(metrics, json_file)rn json_file.close()rn except RuntimeError as main_error:rn logger.error(main_error)rn else:rn logger.info(‘model pipeline training successfully completed!’)rn return 0′), (u’language’, u”)])]

For the full code, please see this notebook.

Once the Spark session has been initialized, the script builds the Spark ML pipeline, loads preprocessed data,  generates train and test samples, trains the model and saves artifacts and metrics to a Cloud Storage bucket.

Before using the PySpark script, it needs to be uploaded to a Cloud Storage bucket:

code_block[StructValue([(u’code’, u’!gsutil cp $SRC/model_training.py $BUCKET_URI/src/model_training.py’), (u’language’, u”)])]

For the full code, please see this notebook.

Once the script has been uploaded to Cloud Storage, you can use the DataprocPySparkBatchOp to define your training component. The value of the main_python_file_uri argument should be the location of the PySpark script within Cloud Storage.

Here you have what we define

code_block[StructValue([(u’code’, u’model_traning_op = DataprocPySparkBatchOp(rn project=project_id,rn location=location,rn container_image=custom_container_image,rn batch_id=training_batch_id,rn main_python_file_uri=training_main_python_file_uri,rn args=build_training_args_op.output,rn ).after(build_training_args_op)’), (u’language’, u”)])]

For the full code, please see this notebook.

DataprocPySparkBatchOp requires you to specify values for the following parameters: 

project, the Project to run the Dataproc batch workload

batch_id, a unique ID to use for the batch job

main_python_file_uri, the HCFS URI of the main Python file to use as the Spark driver

The DataprocPySparkBatchOp component accepts other optional parameters that might be necessary for your workload. To learn more about the component, check out the documentation

Finally, you integrate this component with other tasks in a pipeline definition by using the dsl.pipeline decorator. You then  compile the pipeline definition and run it using the Vertex AI SDK.

While running, the pipeline would submit the training workload to Dataproc Serverless service when the model_traning_op step is executed.  You can see the batch workload details in the Cloud Console after the job successfully runs.

The Batches view of Training job in the Dataproc UI (Click to enlarge)

At this point, you can see how it’s simple to integrate Spark jobs into your machine learning workflow by using the Dataproc Serverless components for Vertex AI Pipelines.  Dataproc Serverless takes care of managing all the infrastructure, and the training workload consumes only the resources it requires for the time it is running.

You can integrate other tasks into your pipeline using a similar approach. The code below is an example pipeline definition that executes a complete machine learning workflow, including data preprocessing and dataset creation, model training, model evaluation, hyperparameter tuning, and uploading the model to Vertex AI Model Registry. The pipeline uses DataprocPySparkBatchOp to execute PySpark workloads on Dataproc Serverless, and other components that are part of the Google Cloud Pipeline Components SDK.

code_block[StructValue([(u’code’, u’@dsl.pipeline(name=”dataproc-pyspark-preprocessing”,rn description=””)rndef pipeline(rn preprocessing_batch_id: str = PREPROCESSING_BATCH_ID,rn preprocessing_main_python_file_uri: str = PREPROCESSING_PYTHON_FILE_URI,rn train_data_path: str = FEATURES_TRAIN_URI,rn preprocessed_data_path: str = PROCESSED_DATA_URI,rn dataset_name: str = DATASET_NAME,rn dataset_uri: str = GCS_PREPROCESSED_URI,rn training_batch_id: str = TRAINING_BATCH_ID,rn training_main_python_file_uri: str = TRAINING_PYTHON_FILE_URI,rn train_path: str = PROCESSED_DATA_URI,rn model_path: str = MODEL_URI,rn metrics_path: str = METRICS_URI,rn threshold:float = AUPR_THRESHOLD,rn hpt_batch_id: str = HPT_TRAINING_BATCH_ID,rn hpt_main_python_file_uri: str = HPT_PYTHON_FILE_URI,rn hpt_model_path: str = HPT_MODEL_URI,rn hpt_metrics_path: str = HPT_METRICS_URI,rn custom_container_image: str = RUNTIME_CONTAINER_IMAGE,rn model_name: str = MODEL_NAME,rn project_id: str = PROJECT_ID,rn location: str = REGION,rn):rnrn from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOprnrn #build preprocessed data argsrn build_preprocessing_args_op = build_preprocessing_args(rn train_data_path=train_data_path,rn processed_data_path=preprocessed_data_pathrn )rnrn # preprocess datarn data_preprocessing_op = DataprocPySparkBatchOp(rn project=project_id,rn location=location,rn container_image=custom_container_image,rn batch_id=preprocessing_batch_id,rn main_python_file_uri=preprocessing_main_python_file_uri,rn args=build_preprocessing_args_op.outputrn ).after(build_preprocessing_args_op)rnrn # create datasetrn create_dataset_op = vertex_ai_components.TabularDatasetCreateOp(rn display_name=dataset_name,rn gcs_source=dataset_uri,rn project=project_id,rn location=location,rn ).after(data_preprocessing_op)rn rn # build training data argsrn build_training_args_op = build_training_args(rn dataset_uri = create_dataset_op.output,rn train_path=train_path,rn model_path=model_path,rn metrics_path=metrics_path,rn ).after(create_dataset_op)rnrn # training modelrn model_training_op = DataprocPySparkBatchOp(rn project=project_id,rn location=location,rn container_image=custom_container_image,rn batch_id=training_batch_id,rn main_python_file_uri=training_main_python_file_uri,rn args=build_training_args_op.output,rn ).after(build_training_args_op)rnrn evaluate_model_op = evaluate_model(rn metrics_uri=metrics_pathrn ).after(model_traning_op)rn rn # evaluate conditionrn with dsl.Condition(evaluate_model_op.outputs[‘threshold_metric’] >= threshold, name=AUPR_HYPERTUNE_CONDITION):rnrn build_hpt_args_op = build_training_args(rn dataset_uri = create_dataset_op.output,rn train_path=train_path,rn model_path=hpt_model_path,rn metrics_path=hpt_metrics_path,rn ).after(evaluate_model_op)rnrn # hypertuning modelrn hyperparameter_tuning_op = DataprocPySparkBatchOp(rn project=project_id,rn location=location,rn container_image=custom_container_image,rn batch_id=hpt_batch_id,rn main_python_file_uri=hpt_main_python_file_uri,rn args=build_hpt_args_op.output,rn ).after(model_traning_op)rnrn # upload modelrn register_model(artifact_uri=hpt_model_path).after(hyperparameter_tuning_op)’), (u’language’, u”)])]

For the full code, please see this notebook

Vertex AI Pipelines allows you to visualize your machine learning workflow as it executes. The following image shows a completed end-to-end execution using the pipeline definition above.

The pipeline in the Vertex AI Pipelines UI (Click to enlarge)

Conclusion

In this blogpost, we announced new Dataproc componentsnow available for Vertex AI Pipelines. We also provided an end-to-end example of how to use the DataprocPySparkBatchOp to preprocess data, train and hypertune a PySpark MLlib model for loan eligibility. 

What’s Next

Do you want to know more about Dataproc serverless, Vertex AI Pipelines and how to deploy Spark models on Vertex AI? Check out the following resources:

Documentation

What is Dataproc Serverless? 

Vertex AI Pipelines

Serving Spark ML models using Vertex AI | Cloud Architecture Center

Code Labs

Intro to Vertex Pipelines

Using Vertex ML Metadata with Pipelines

Github samples

Video Series: AI Simplified: Vertex AI

Quick Lab: Build and Deploy Machine Learning Solutions on Vertex AI

Special thanks to Abhishek Kashyap, Henry Tappen, Mikhail Chrestkha, Karthik Ramachadran, Yang Pan, Karl Weinmeister, Andrew Ferlitsch for their support and contributions to this blogpost.

Related Article

Announcing BigQuery and BigQuery ML operators for Vertex AI Pipelines

Announcing the official release of new BigQuery and BigQueryML components for Vertex AI Pipelines that help make it easier to operational…

Read Article

Cloud BlogRead More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments