spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jestin Ma <>
Subject [No Subject]
Date Sun, 14 Aug 2016 06:15:43 GMT
Hi, I'm currently trying to perform an outer join between two
DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id. is skewed in that there are many 0's, the rest being unique IDs. is not skewed. If I filter != 0, then the join works well. If
I don't, then the join does not complete for a very, very long time.

I have diagnosed this problem due to the hashpartitioning on IDs, resulting
in one partition containing many values due to data skew. One executor ends
up reading most of the shuffle data, and writing all of the shuffle data,
as shown below.

Shown above is the task in question assigned to one executor.

This screenshot comes from one of the executors, showing one single thread
spilling sort data since the executor cannot hold 90%+ of the ~200 GB
result in memory.

Moreover, looking at the event timeline, I find that the executor on that
task spends about 20% time reading shuffle data, 70% computation, and 10%
writing output data.

I have tried the following:

   - "Salting" the 0-value keys by monotonically_increasing_id().mod(N)
   - - This doesn't seem to have an effect since now I have
   hundreds/thousands of keys with tens of thousands of occurrences.
   - - Should I increase N? Is there a way to just do random.mod(N) instead
   of monotonically_increasing_id()?
   - Repartitioning according to column I know contains unique values
   - - This is overridden by Spark's sort-based shuffle manager which hash
   repartitions on the skewed column
   - - Is it possible to change this? Or will the join column need to be
   hashed and partitioned on for joins to work
   - Broadcasting does not work for my large tables
   - Increasing/decreasing spark.sql.shuffle.partitions does not remedy the
   skewed data problem as 0-product values are still being hashed to the same


What I am considering currently is doing the join at the RDD level, but is
there any level of control which can solve my skewed data problem? Other
than that, see the bolded question.

I would appreciate any suggestions/tips/experience with this. Thank you!

View raw message