Tuesday, April 23, 2024
No menu items!
HomeCloud ComputingTune Spark properties to optimize Dataproc Serverless jobs

Tune Spark properties to optimize Dataproc Serverless jobs

Data scientists and engineers need large-scale compute resources but often need more know-how or the engineering team to create and manage the underlying infrastructure.  Dataproc Serverless, the industry’s first autoscaling serverless product for Spark, simplifies their experience by not having to think about infrastructure for their work.

Dataproc Serverless uses Spark properties to determine the compute, memory, and disk resources to allocate to your batch workload. These property settings can affect workload quota consumption and cost (see Dataproc Serverless quotas and Dataproc Serverless pricing for more information).

This blog post will demonstrate how you can use these Spark properties to meet the runtime and cost service level agreements of your Spark jobs.

Before you begin

This blog post assumes the following:

You have an Apache Spark jar available for Dataproc Serverless.

Your project has the appropriate quota for testing.

You have no insight into what amount of cpus/memory/disk your job requires.

You’ve exhausted code optimization opportunities.

You have tried other methods such as optimizing GCS file structures / file types etc.

Dataproc Serverless and Apache Spark tuning is an iterative process. This guide outlines the steps to fine-tune your spark properties to meet cost and performance requirements. It will require you to submit your job several times.

Dataproc Serverless pricing

Dataproc Serverless for Spark pricing is based on the number of Data Compute Units (DCUs) and the amount of shuffle storage used. DCUs and shuffle storage are billed per second, with a 1 minute minimum charge.

For more information, visit: Dataproc Serverless pricing

Spark Persistent History Server and GCS Logging Output

Set up a Spark Persistent History Server (PHS) for your jobs.  This will help during debugging and troubleshooting.  All executors and driver logs are available in Cloud Logging while the Spark job is running.  Also, Spark applications are visible in PHS while running (PHS > Incomplete Applications).  Executor and driver logs are available in Cloud Logging, but the PHS has the Spark event logs that provide insight into Spark app execution (DAG, executor events, etc).  The logs are actually stored in GCS; however, the UI to access them is available through the PHS (when the PHS is not running, Spark won’t send the logs to GCS).

code_block[StructValue([(u’code’, u’gcloud dataproc clusters create \rn –region=<region> \rn –single-node \rn –enable-component-gateway \ –properties=spark:spark.history.fs.logDirectory=gs://<bucket>/phs/*/spark-job-history’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfe7a3a1d90>)])]

Additionally, set up a Google Cloud Storage bucket and pass these properties to your Spark job.

code_block[StructValue([(u’code’, u’spark.eventLog.enabled=true,spark.eventLog.dir=<gcs_bucket>’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfe6bd00090>)])]

Tracking job performance/cost progress

Throughout your iterative experimentation, keep track of each job’s properties and the impact on runtime and cost.

Submitting your job

code_block[StructValue([(u’code’, u’gcloud dataproc batches submit spark \rn –region=<your-region> \rn –jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \rn –class=org.apache.spark.examples.SparkPi \ –history-server-cluster=projects/<project_id>/regions/<region>/clusters/<phs-cluster-name> \rn — 1000′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfe6bf2a350>)])]

If your job runtime / DCU usage is satisfactory, you can stop here. This can be found on the details tab of the Dataproc Serverless batch console:

As your job runs, monitor the maximum-needed value in the Monitoring tab of the Dataproc Serverless batch console:

In the above example: maximum-needed = 36.5, but the job was only allocated with 1 executor.

The maximum-needed value is computed by adding the count of running tasks and pending tasks, and dividing by the tasks per executor.  Dataproc Serverless will attempt to meet this demand based on the autoscaling-related properties passed during job submission.  These properties are discussed in the next section.

Fine-tuning your Spark properties

Allocated job resources

For sake of simplicity, below are three example configurations for varying workloads. Experiment with each configuration to determine which best suits your job. To view the default Spark property values and learn more visit: Spark properties | Dataproc Serverless Documentation | Google Cloud

code_block[StructValue([(u’code’, u’–properties=spark.driver.cores=4,spark.executor.cores=4,spark.driver.memory=25g,spark.driver.memoryOverhead=4g,spark.executor.memory=25g,spark.executor.memoryOverhead=4g’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfeb168e350>)])]
code_block[StructValue([(u’code’, u’–properties=spark.driver.cores=8,spark.executor.cores=8,spark.driver.memory=50g,spark.driver.memoryOverhead=8g,spark.executor.memory=50g,spark.executor.memoryOverhead=8g’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfeb168e990>)])]
code_block[StructValue([(u’code’, u’–properties=spark.driver.cores=16,spark.executor.cores=16,spark.executor.instances=500,spark.driver.memory=100g,spark.driver.memoryOverhead=16g,spark.executor.memory=100g,spark.executor.memoryOverhead=16g’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfec264ed50>)])]

Autoscaling

Autoscaling helps cut costs by only using required resources, but may affect runtime and stability of the job. It is best to leave this as default until you are confident that runtime or job stability do not meet your needs.

Several properties influence scale range and rate, elaborated below. To view the default Autoscaling property values and learn more visit: Dataproc Serverless for Spark autoscaling.

Range

Configuring a range will provide Dataproc Serverless jobs with a minimum and maximum number of executors.  You can also set an initial amount (default is 2).  Setting a sensible initial amount of executor instances will reduce runtime (less time spent autoscaling). Limit costs by setting maximum number of executors.

Rate

When autoscaling, Dataproc Serverless jobs compare the current number of executors vs. the maximum-needed (total number of running tasks and pending tasks, divided by the tasks per executor) and will increase/decrease the current number of executors to reach equilibrium. The rate of increase/decrease is determined by executorAllocationRatio and decommission.maxRatio.

The executorAllocationRatio property determines how quickly the job scales UP. The decommission.maxRatio property determines how quickly the job scales DOWN. 

Decommissioning too many executors at the same time with shuffle or rdd migration could severely hurt performance of shuffle fetch. Therefore it is best to set this to a low value, 0.1-0.3 range. This will reduce performance regression caused by decommission. 

For sake of simplicity, here are three example configurations for autoscaling rate:

code_block[StructValue([(u’code’, u’–properties=spark.dynamicAllocation.executorAllocationRatio=0.3,spark.decommission.maxRatio=0.2′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfec19a1e50>)])]
code_block[StructValue([(u’code’, u’–properties=spark.dynamicAllocation.executorAllocationRatio=0.5,spark.decommission.maxRatio=0.2′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfec19a1090>)])]
code_block[StructValue([(u’code’, u’–properties=spark.dynamicAllocation.executorAllocationRatio=1.0,spark.decommission.maxRatio=0.2′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfe6ba20c90>)])]

No Autoscaling

Since we know how many executors your job needs based on the maximum-needed value, and since autoscaling takes time to scale up, we can set the min and max executors to be identical and turn off autoscaling. This will give your job the exact number of instances beginning / during your job run. Because dynamicAllocation is still enabled, you’ll be able to see the monitoring tab in the Dataproc Serverless job console.

code_block[StructValue([(u’code’, u’–properties=spark.dynamicAllocation.enabled=true,spark.dynamicAllocation.minExecutors=100,spark.dynamicAllocation.maxExecutors=100,spark.executor.instances=100′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfec2c66310>)])]

Compute and disk tiers

One more way to boost the performance of your Dataproc Serverless job is to utilize compute and disk tiers. By default, your jobs will be set to standard tier. If you want higher per-core performance (compute) or better IOPS and throughput (disk), consider setting these properties to premium tier. Be aware that jobs will be billed at a higher rate with premium tiers.

Putting it all together

Consider the above options when submitting your job. Some example scenarios:

1. A job requires medium allocated job resources and moderate autoscaling?

code_block[StructValue([(u’code’, u’–properties=spark.driver.cores=8,spark.executor.cores=8,spark.driver.memory=50g,spark.driver.memoryOverhead=8g,spark.executor.memory=50g,spark.executor.memoryOverhead=8g,spark.dynamicAllocation.enabled=true,spark.reducer.fetchMigratedShuffle.enabled=true,spark.dynamicAllocation.executorAllocationRatio=0.3,spark.decommission.maxRatio=0.2′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfe79ec8e50>)])]

2. A job requires maximum allocated job resources, fast autoscaling, and we know the initial executor amount to be 50?

code_block[StructValue([(u’code’, u’–properties=spark.driver.cores=16,spark.executor.cores=16,spark.driver.memory=100g,spark.driver.memoryOverhead=16g,spark.executor.memory=100g,spark.executor.memoryOverhead=16g,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium,spark.dataproc.driver.disk.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dynamicAllocation.enabled=true,spark.reducer.fetchMigratedShuffle.enabled=true,spark.dynamicAllocation.executorAllocationRatio=1.0,spark.decomission.maxRatio=0.3,spark.dynamicAllocation.initialExecutors=50,spark.executor.instances=50′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfe8c9128d0>)])]

3. A small-sized job has a predictable and consistent resource requirement of 20 executors and a need for high IOPS?

code_block[StructValue([(u’code’, u’–properties=spark.driver.cores=4,spark.executor.cores=4,spark.driver.memory=25g,spark.driver.memoryOverhead=4g,spark.executor.memory=25g,spark.executor.memoryOverhead=4g,spark.dataproc.driver.disk.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dynamicAllocation.enabled=true,spark.reducer.fetchMigratedShuffle.enabled=true,spark.dynamicAllocation.minExecutors=20,spark.dynamicAllocation.maxExecutors=20,spark.executor.instances=20′), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3dfe8c912dd0>)])]

Common errors explained

Keep an eye on your project quota. If autoscaling is enabled and the project quota limit is hit, your job will not receive the resources it requires and may fail with this error.

Disk Usage = number_instances * number_cores * disk_size

CPU Usage = number_instances * number_cores

CourseGrainedSchedulerError

This error usually relates to insufficient resources. Try increasing the driver cores/memory and/or the number of executor instances.

FetchFailedException

Try setting spark.reducer.fetchMigratedShuffle.enabled=true

For more information, visit Spark Dynamic Allocation Issues and Solutions 

Next steps

As discussed in this blog post, Dataproc Serverless uses Spark properties to determine the compute, memory, and disk resources to allocate to your batch workload. These property settings can affect workload quota consumption and cost (see Dataproc Serverless quotas and Dataproc Serverless pricing for more information). Thus, it is essential to understand how these properties function and interact with each other.  

To get started with Dataproc Serverless, check out our Serverless Spark Workshop or the Dataproc Serverless Quickstart

If you want to learn more, check out some related blog posts:

Simplify data processing and data science jobs with Spark on Google Cloud 

Spark jobs that autoscale and made seamless for all data users

Running Persistent History Servers

Cloud BlogRead More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments