Does anyone have tips or solutions at hand? Thanks!
Bolding for the skim-readers, I'm not shouting ;)
Problem on my side, example setup:
Mesos 1.3.1, Spark 2.1.1,
Coarse mode, dynamicAllocation off, shuffle service off
spark.executor.cores=8 (machines have 32)
spark.executor.memory=50G (machines have 250G)
Stage 1 goes okyish, after setting spark.task.cpus=2. Without this setting, there was 8 python processes per executor (using 8 CPUs) plus 2-4 CPUs of the java processes, ending up with 10-14 cores per executor instead of the 8. This JVM overhead is ok to handle with this setting I believe.
val df = spark.read.parquet(path)
val grpd = df.rdd.map(lambda x: (x, list(x[1:]))).groupByKey()
This stage runs 3 hours, writes 990G of shuffle.
Stage 2 is roughly speaking a
which runs much more (sometimes dozens!) than 4 python processes per executor, which would be the expected number given 8 executor cores with task.cpus=2. Runs for about 15 hours.
We are fairly sure that the mapValues function doesn't apply multi-processing. Actually this would probably result in single Python processes use more than 100% CPU - something which is never observed.
Unfortunately these Spark tasks then overuse their allocated Mesos resources by 100-150% (hitting the physical limit of the machine).
Any tipps much appreciated!