spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Corey Nolet <>
Subject Re: Shuffle memory woes
Date Mon, 08 Feb 2016 03:16:55 GMT

Thank you for chiming in and I'm glad someone else is experiencing this too
and not just me. I know very well how the Spark shuffles work and I've done
deep dive presentations @ Spark meetups in the past. This problem is
somethng that goes beyond that and, I believe, it exposes a fundamental
paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
it can be fixed.

Also- in regards to how much data actually gets shuffled- believe it or not
this problem can take a 30-40 minute job and make it run for 4 or more
hours. If  let the job run for 4+ hours the amount of data being shuffled
for this particular dataset will be 100 or more TB. Usually, however, I end
up killing the job long before that point because I realize it should not
be taking this long. The particular dataset we're doing is not for
real-time exploration. These are very large joins we're doing for jobs that
we run a few times a day.

On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <> wrote:

>  "The dataset is 100gb at most, the spills can up to 10T-100T"
> -- I have had the same experiences, although not to this extreme (the
> spills were < 10T while the input was ~ 100s gb) and haven't found any
> solution yet. I don't believe this is related to input data format. in my
> case, I got my input data by loading from Hive tables.
> On Sun, Feb 7, 2016 at 6:28 AM, Sea <> wrote:
>> Hi,Corey:
>>    "The dataset is 100gb at most, the spills can up to 10T-100T", Are
>> your input files lzo format, and you use sc.text() ? If memory is not
>> enough, spark will spill 3-4x of input data to disk.
>> ------------------ 原始邮件 ------------------
>> *发件人:* "Corey Nolet";<>;
>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>> *收件人:* "Igor Berman"<>;
>> *抄送:* "user"<>;
>> *主题:* Re: Shuffle memory woes
>> As for the second part of your questions- we have a fairly complex join
>> process which requires a ton of stage orchestration from our driver. I've
>> written some code to be able to walk down our DAG tree and execute siblings
>> in the tree concurrently where possible (forcing cache to disk on children
>> that that have multiple chiildren themselves so that they can be run
>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>> keeping tasks as busy as possible processing concurrent stages. Funny
>> enough though, the stage that is causing problems with shuffling for us has
>> a lot of children and doesn't even run concurrently with any other stages
>> so I ruled out the concurrency of the stages as a culprit for the
>> shuffliing problem we're seeing.
>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <> wrote:
>>> Igor,
>>> I don't think the question is "why can't it fit stuff in memory". I know
>>> why it can't fit stuff in memory- because it's a large dataset that needs
>>> to have a reduceByKey() run on it. My understanding is that when it doesn't
>>> fit into memory it needs to spill in order to consolidate intermediary
>>> files into a single file. The more data you need to run through this, the
>>> more it will need to spill. My findings is that once it gets stuck in this
>>> spill chain with our dataset it's all over @ that point because it will
>>> spill and spill and spill and spill and spill. If I give the shuffle enough
>>> memory it won't- irrespective of the number of partitions we have (i've
>>> done everything from repartition(500) to repartition(2500)). It's not a
>>> matter of running out of memory on a single node because the data is
>>> skewed. It's more a matter of the shuffle buffer filling up and needing to
>>> spill. I think what may be happening is that it gets to a point where it's
>>> spending more time reading/writing from disk while doing the spills then it
>>> is actually processing any data. I can tell this because I can see that the
>>> spills sometimes get up into the 10's to 100's of TB where the input data
>>> was maybe acquireExecutionMemory at most. Unfortunately my code is on a
>>> private internal network and I'm not able to share it.
>>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <>
>>> wrote:
>>>> so can you provide code snippets: especially it's interesting to see
>>>> what are your transformation chain, how many partitions are there on each
>>>> side of shuffle operation
>>>> the question is why it can't fit stuff in memory when you are shuffling
>>>> - maybe your partitioner on "reduce" side is not configured properly? I
>>>> mean if map side is ok, and you just reducing by key or something it should
>>>> be ok, so some detail is missing...skewed data? aggregate by key?
>>>> On 6 February 2016 at 20:13, Corey Nolet <> wrote:
>>>>> Igor,
>>>>> Thank you for the response but unfortunately, the problem I'm
>>>>> referring to goes beyond this. I have set the shuffle memory fraction
to be
>>>>> 90% and set the cache memory to be 0. Repartitioning the RDD helped a
>>>>> on the map side but didn't do much for the spilling when there was no
>>>>> longer any memory left for the shuffle. Also the new auto-memory management
>>>>> doesn't seem like it'll have too much of an effect after i've already
>>>>> most the memory i've allocated to the shuffle. The problem I'm having
>>>>> most specifically related to the shuffle performing declining by several
>>>>> orders of magnitude when it needs to spill multiple times (it ends up
>>>>> spilling several hundred for me when it can't fit stuff into memory).
>>>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <>
>>>>> wrote:
>>>>>> Hi,
>>>>>> usually you can solve this by 2 steps
>>>>>> make rdd to have more partitions
>>>>>> play with shuffle memory fraction
>>>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted
>>>>>> automatically
>>>>>> On 5 February 2016 at 23:07, Corey Nolet <>
>>>>>>> I just recently had a discovery that my jobs were taking several
>>>>>>> hours to completely because of excess shuffle spills. What I
found was that
>>>>>>> when I hit the high point where I didn't have enough memory for
>>>>>>> shuffles to store all of their file consolidations at once, it
could spill
>>>>>>> so many times that it causes my job's runtime to increase by
orders of
>>>>>>> magnitude (and sometimes fail altogether).
>>>>>>> I've played with all the tuning parameters I can find. To speed
>>>>>>> shuffles up, I tuned the akka threads to different values. I
also tuned the
>>>>>>> shuffle buffering a tad (both up and down).
>>>>>>> I feel like I see a weak point here. The mappers are sharing
>>>>>>> space with reducers and the shuffles need enough memory to consolidate
>>>>>>> pull otherwise they will need to spill and spill and spill. What
>>>>>>> noticed about my jobs is that this is a difference between them
taking 30
>>>>>>> minutes and 4 hours or more. Same job- just different memory
>>>>>>> I've found that, as a result of the spilling, I'm better off
>>>>>>> caching any data in memory and lowering my storage fraction to
0 and still
>>>>>>> hoping I was able to give my shuffles enough memory that my data
>>>>>>> continuously spill. Is this the way it's supposed to be? It makes
it hard
>>>>>>> because it seems like it forces the memory limits on my job-
otherwise it
>>>>>>> could take orders of magnitude longer to execute.

View raw message