spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Corey Nolet <>
Subject Re: Shuffle memory woes
Date Mon, 08 Feb 2016 13:24:55 GMT
I sure do! [1] And yes- I'm really hoping they will chime in, otherwise I
may dig a little deeper myself and start posting some jira tickets.


On Mon, Feb 8, 2016 at 3:02 AM, Igor Berman <> wrote:

> It's interesting to see what spark dev people will say.
> Corey do you have presentation available online?
> On 8 February 2016 at 05:16, Corey Nolet <> wrote:
>> Charles,
>> Thank you for chiming in and I'm glad someone else is experiencing this
>> too and not just me. I know very well how the Spark shuffles work and I've
>> done deep dive presentations @ Spark meetups in the past. This problem is
>> somethng that goes beyond that and, I believe, it exposes a fundamental
>> paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
>> it can be fixed.
>> Also- in regards to how much data actually gets shuffled- believe it or
>> not this problem can take a 30-40 minute job and make it run for 4 or more
>> hours. If  let the job run for 4+ hours the amount of data being shuffled
>> for this particular dataset will be 100 or more TB. Usually, however, I end
>> up killing the job long before that point because I realize it should not
>> be taking this long. The particular dataset we're doing is not for
>> real-time exploration. These are very large joins we're doing for jobs that
>> we run a few times a day.
>> On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <>
>> wrote:
>>>  "The dataset is 100gb at most, the spills can up to 10T-100T"
>>> -- I have had the same experiences, although not to this extreme (the
>>> spills were < 10T while the input was ~ 100s gb) and haven't found any
>>> solution yet. I don't believe this is related to input data format. in my
>>> case, I got my input data by loading from Hive tables.
>>> On Sun, Feb 7, 2016 at 6:28 AM, Sea <> wrote:
>>>> Hi,Corey:
>>>>    "The dataset is 100gb at most, the spills can up to 10T-100T", Are
>>>> your input files lzo format, and you use sc.text() ? If memory is not
>>>> enough, spark will spill 3-4x of input data to disk.
>>>> ------------------ 原始邮件 ------------------
>>>> *发件人:* "Corey Nolet";<>;
>>>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>>>> *收件人:* "Igor Berman"<>;
>>>> *抄送:* "user"<>;
>>>> *主题:* Re: Shuffle memory woes
>>>> As for the second part of your questions- we have a fairly complex join
>>>> process which requires a ton of stage orchestration from our driver. I've
>>>> written some code to be able to walk down our DAG tree and execute siblings
>>>> in the tree concurrently where possible (forcing cache to disk on children
>>>> that that have multiple chiildren themselves so that they can be run
>>>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>>>> keeping tasks as busy as possible processing concurrent stages. Funny
>>>> enough though, the stage that is causing problems with shuffling for us has
>>>> a lot of children and doesn't even run concurrently with any other stages
>>>> so I ruled out the concurrency of the stages as a culprit for the
>>>> shuffliing problem we're seeing.
>>>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <> wrote:
>>>>> Igor,
>>>>> I don't think the question is "why can't it fit stuff in memory". I
>>>>> know why it can't fit stuff in memory- because it's a large dataset that
>>>>> needs to have a reduceByKey() run on it. My understanding is that when
>>>>> doesn't fit into memory it needs to spill in order to consolidate
>>>>> intermediary files into a single file. The more data you need to run
>>>>> through this, the more it will need to spill. My findings is that once
>>>>> gets stuck in this spill chain with our dataset it's all over @ that
>>>>> because it will spill and spill and spill and spill and spill. If I give
>>>>> the shuffle enough memory it won't- irrespective of the number of
>>>>> partitions we have (i've done everything from repartition(500) to
>>>>> repartition(2500)). It's not a matter of running out of memory on a single
>>>>> node because the data is skewed. It's more a matter of the shuffle buffer
>>>>> filling up and needing to spill. I think what may be happening is that
>>>>> gets to a point where it's spending more time reading/writing from disk
>>>>> while doing the spills then it is actually processing any data. I can
>>>>> this because I can see that the spills sometimes get up into the 10's
>>>>> 100's of TB where the input data was maybe acquireExecutionMemory at
>>>>> most. Unfortunately my code is on a private internal network and I'm
>>>>> not able to share it.
>>>>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <>
>>>>> wrote:
>>>>>> so can you provide code snippets: especially it's interesting to
>>>>>> what are your transformation chain, how many partitions are there
on each
>>>>>> side of shuffle operation
>>>>>> the question is why it can't fit stuff in memory when you are
>>>>>> shuffling - maybe your partitioner on "reduce" side is not configured
>>>>>> properly? I mean if map side is ok, and you just reducing by key
>>>>>> something it should be ok, so some detail is missing...skewed data?
>>>>>> aggregate by key?
>>>>>> On 6 February 2016 at 20:13, Corey Nolet <>
>>>>>>> Igor,
>>>>>>> Thank you for the response but unfortunately, the problem I'm
>>>>>>> referring to goes beyond this. I have set the shuffle memory
fraction to be
>>>>>>> 90% and set the cache memory to be 0. Repartitioning the RDD
helped a tad
>>>>>>> on the map side but didn't do much for the spilling when there
was no
>>>>>>> longer any memory left for the shuffle. Also the new auto-memory
>>>>>>> doesn't seem like it'll have too much of an effect after i've
already given
>>>>>>> most the memory i've allocated to the shuffle. The problem I'm
having is
>>>>>>> most specifically related to the shuffle performing declining
by several
>>>>>>> orders of magnitude when it needs to spill multiple times (it
ends up
>>>>>>> spilling several hundred for me when it can't fit stuff into
>>>>>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <>
>>>>>>> wrote:
>>>>>>>> Hi,
>>>>>>>> usually you can solve this by 2 steps
>>>>>>>> make rdd to have more partitions
>>>>>>>> play with shuffle memory fraction
>>>>>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted
>>>>>>>> automatically
>>>>>>>> On 5 February 2016 at 23:07, Corey Nolet <>
>>>>>>>>> I just recently had a discovery that my jobs were taking
>>>>>>>>> hours to completely because of excess shuffle spills.
What I found was that
>>>>>>>>> when I hit the high point where I didn't have enough
memory for the
>>>>>>>>> shuffles to store all of their file consolidations at
once, it could spill
>>>>>>>>> so many times that it causes my job's runtime to increase
by orders of
>>>>>>>>> magnitude (and sometimes fail altogether).
>>>>>>>>> I've played with all the tuning parameters I can find.
To speed
>>>>>>>>> the shuffles up, I tuned the akka threads to different
values. I also tuned
>>>>>>>>> the shuffle buffering a tad (both up and down).
>>>>>>>>> I feel like I see a weak point here. The mappers are
>>>>>>>>> memory space with reducers and the shuffles need enough
memory to
>>>>>>>>> consolidate and pull otherwise they will need to spill
and spill and spill.
>>>>>>>>> What i've noticed about my jobs is that this is a difference
between them
>>>>>>>>> taking 30 minutes and 4 hours or more. Same job- just
different memory
>>>>>>>>> tuning.
>>>>>>>>> I've found that, as a result of the spilling, I'm better
off not
>>>>>>>>> caching any data in memory and lowering my storage fraction
to 0 and still
>>>>>>>>> hoping I was able to give my shuffles enough memory that
my data doesn't
>>>>>>>>> continuously spill. Is this the way it's supposed to
be? It makes it hard
>>>>>>>>> because it seems like it forces the memory limits on
my job- otherwise it
>>>>>>>>> could take orders of magnitude longer to execute.

View raw message