spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thakrar, Jayesh" <>
Subject Re: How to address seemingly low core utilization on a spark workload?
Date Thu, 15 Nov 2018 21:23:34 GMT
So 164 GB of parquet data –can potentially explode to upto 1000 GB data if the data is compressed
(in practice it would be more like 400-600 GB)
Your executors have about 96 GB data.
With that kind of volume, 100-300 executors is ok (I would do tests with 100-300), but 30k
shuffle partitions are still excessive.

From: Vitaliy Pisarev <>
Date: Thursday, November 15, 2018 at 1:58 PM
To: "Thakrar, Jayesh" <>
Cc: Shahbaz <>, user <>, David Markovitz
Subject: Re: How to address seemingly low core utilization on a spark workload?

Small update, my initial estimate was incorrect. I have one location with 16*4G = 64G parquests
(in snappy) + 20 * 5G = 100G parquets. So a total of 164G.

I am running on Databricks.
Here are some settings:

spark.executor.extraJavaOptions=-XX:ReservedCodeCacheSize=256m -XX:+UseCodeCacheFlushing -Ddatabricks.serviceName=spark-executor-1
-javaagent:/databricks/DatabricksAgent.jar -XX:+PrintFlagsFinal -XX:+PrintGCDateStamps -verbose:gc
-XX:+PrintGCDetails -Xss4m

These are the only relevant setting that I see set when looking at the logs. I am guessing
this means that the others are simply set to default.
Are there any setting I should pay special attention to? (reference is also good).

My assumption is the the Databricks runtime is already preconfigured with known best practices
(like corse per executor...). Now that I think of it I need to validate this assumption.

On Thu, Nov 15, 2018 at 9:14 PM Thakrar, Jayesh <<>>
While there is some merit to that thought process, I would steer away from premature JVM GC
optimization of this kind.
What are the memory, cpu and other settings (e.g. any JVM/GC settings) for the executors and
So assuming that you are reading about 16 files of say 2-4 GB each, that’s about 32-64 GB
of (compressed?) data in parquet files.
Do you have access to the Spark UI – what is the peak memory that you see for the executors?
The UI will also give you the time spent on GC by each executor.
So even if you completely eliminated all GC, that’s the max time you can potentially save.

From: Vitaliy Pisarev <<>>
Date: Thursday, November 15, 2018 at 1:03 PM
To: Shahbaz <<>>
Cc: "Thakrar, Jayesh" <<>>,
user <<>>, "<>"
Subject: Re: How to address seemingly low core utilization on a spark workload?

Agree, and I will try it. One clarification though: the amount of partitions also affects
their in memory size. So fewer partitions may result in higher memory preassure and Ooms.
I think this was the original intention.

So the motivation for partitioning is also to break down volumes yo fit the machines.

Is this premise wrong?

On Thu, Nov 15, 2018, 19:49 Shahbaz <<>
30k Sql shuffle partitions is extremely high.Core to Partition is 1 to  1 ,default value of
Sql shuffle partitions is  200 ,set it to 300 or leave it to default ,see which one gives
best performance,after you do that ,see how cores are being used?


On Thu, Nov 15, 2018 at 10:58 PM Vitaliy Pisarev <<>>
Oh, regarding and shuffle.partitions being 30k, don't know. I inherited the workload from
an engineer that is no longer around and am trying to make sense of things in general.

On Thu, Nov 15, 2018 at 7:26 PM Vitaliy Pisarev <<>>
The quest is dual:

  *   Increase utilisation- because cores cost money and I want to make sure that if I fully
utilise what I pay for. This is very blunt of corse, because there is always i/o and at least
some degree of skew. Bottom line is do the same thing over the same time but with fewer (but
better utilised) resources.
  *   Reduce runtime by increasing parallelism.
While not the same, I am looking at these as two sides of the same coin.

On Thu, Nov 15, 2018 at 6:58 PM Thakrar, Jayesh <<>>
For that little data, I find spark.sql.shuffle.partitions = 30000 to be very high.
Any reason for that high value?

Do you have a baseline observation with the default value?

Also, enabling the jobgroup and job info through the API and observing through the UI will
help you understand the code snippets when you have low utilization.

Finally, high utilization does not equate to high efficiency.
Its very likely that for your workload, you may only need 16-128 executors.
I would suggest getting the partition count for the various datasets/dataframes/rdds in your
code by using

dataset.rdd. getNumPartitions

I would also suggest doing a number of tests with different number of executors too.

But coming back to the objective behind your quest – are you trying to maximize utilization
hoping that by having high parallelism will reduce your total runtime?

From: Vitaliy Pisarev <<>>
Date: Thursday, November 15, 2018 at 10:07 AM
To: <<>>
Cc: user <<>>, David Markovitz
Subject: Re: How to address seemingly low core utilization on a spark workload?

I am working with parquets and the metadata reading there is quite fast as there are at most
16 files (a couple of gigs each).

I find it very hard to answer the question: "how many partitions do you have?", many spark
operations do not preserve partitioning and I have a lot of filtering and grouping going on.
What I can say is that I specified spark.sql.shuffle.partitions to 30,000.

I am not worried that there are not enough partitions to keep the cores working. Having said
that I do see that the high utilisation correlates heavily with shuffle read/write. Whereas
low utilisation correlates with no shuffling.
This leads me to the conclusion that compared to the amount of shuffling, the cluster is doing
very little work.

Question is what can I do about it.

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <<>>
Can you shed more light on what kind of processing you are doing?

One common pattern that I have seen for active core/executor utilization dropping to zero
is while reading ORC data and the driver seems (I think) to be doing schema validation.
In my case I would have hundreds of thousands of ORC data files and there is dead silence
for about 1-2 hours.
I have tried providing a schema and disabling schema validation while reading the ORC data,
but that does not seem to help (Spark 2.2.1).

And as you know, in most cases, there is a linear relationship between number of partitions
in your data and the concurrently active executors.

Another thing I would suggest is use the following two API calls/method – they will annotate
the spark stages and jobs with what is being executed in the Spark UI.

From: Vitaliy Pisarev <<>>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <<>>
Cc: David Markovitz <<>>
Subject: How to address seemingly low core utilization on a spark workload?

I have a workload that runs on a cluster of 300 cores.
Below is a plot of the amount of active tasks over time during the execution of this workload:

Error! Filename not specified.

What I deduce is that there are substantial intervals where the cores are heavily under-utilised.

What actions can I take to:

  *   Increase the efficiency (== core utilisation) of the cluster?
  *   Understand the root causes behind the drops in core utilisation?
View raw message