spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jestin Ma <jestinwith.a...@gmail.com>
Subject Re:
Date Sun, 14 Aug 2016 06:17:24 GMT
Attached are screenshots mentioned, apologies for that.

On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma <jestinwith.an.e@gmail.com>
wrote:

> Hi, I'm currently trying to perform an outer join between two
> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id.
>
> df1.id is skewed in that there are many 0's, the rest being unique IDs.
>
> df2.id is not skewed. If I filter df1.id != 0, then the join works well.
> If I don't, then the join does not complete for a very, very long time.
>
> I have diagnosed this problem due to the hashpartitioning on IDs,
> resulting in one partition containing many values due to data skew. One
> executor ends up reading most of the shuffle data, and writing all of the
> shuffle data, as shown below.
>
>
>
>
>
> Shown above is the task in question assigned to one executor.
>
>
>
> This screenshot comes from one of the executors, showing one single thread
> spilling sort data since the executor cannot hold 90%+ of the ~200 GB
> result in memory.
>
> Moreover, looking at the event timeline, I find that the executor on that
> task spends about 20% time reading shuffle data, 70% computation, and 10%
> writing output data.
>
> I have tried the following:
>
>
>    - "Salting" the 0-value keys by monotonically_increasing_id().mod(N)
>    - - This doesn't seem to have an effect since now I have
>    hundreds/thousands of keys with tens of thousands of occurrences.
>    - - Should I increase N? Is there a way to just do random.mod(N)
>    instead of monotonically_increasing_id()?
>    -
>    - Repartitioning according to column I know contains unique values
>    -
>    - - This is overridden by Spark's sort-based shuffle manager which
>    hash repartitions on the skewed column
>    -
>    - - Is it possible to change this? Or will the join column need to be
>    hashed and partitioned on for joins to work
>    -
>    - Broadcasting does not work for my large tables
>    -
>    - Increasing/decreasing spark.sql.shuffle.partitions does not remedy
>    the skewed data problem as 0-product values are still being hashed to the
>    same partition.
>
>
> ----------------------------------
>
> What I am considering currently is doing the join at the RDD level, but is
> there any level of control which can solve my skewed data problem? Other
> than that, see the bolded question.
>
> I would appreciate any suggestions/tips/experience with this. Thank you!
>
>

Mime
View raw message