spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Charles Chao <xpnc54byp...@gmail.com>
Subject Re: Shuffle memory woes
Date Mon, 08 Feb 2016 02:56:41 GMT
 "The dataset is 100gb at most, the spills can up to 10T-100T"

-- I have had the same experiences, although not to this extreme (the
spills were < 10T while the input was ~ 100s gb) and haven't found any
solution yet. I don't believe this is related to input data format. in my
case, I got my input data by loading from Hive tables.

On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810726@qq.com> wrote:

> Hi,Corey:
>    "The dataset is 100gb at most, the spills can up to 10T-100T", Are your
> input files lzo format, and you use sc.text() ? If memory is not enough,
> spark will spill 3-4x of input data to disk.
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Corey Nolet";<cjnolet@gmail.com>;
> *发送时间:* 2016年2月7日(星期天) 晚上8:56
> *收件人:* "Igor Berman"<igor.berman@gmail.com>;
> *抄送:* "user"<user@spark.apache.org>;
> *主题:* Re: Shuffle memory woes
>
> As for the second part of your questions- we have a fairly complex join
> process which requires a ton of stage orchestration from our driver. I've
> written some code to be able to walk down our DAG tree and execute siblings
> in the tree concurrently where possible (forcing cache to disk on children
> that that have multiple chiildren themselves so that they can be run
> concurrently). Ultimatey, we have seen significant speedup in our jobs by
> keeping tasks as busy as possible processing concurrent stages. Funny
> enough though, the stage that is causing problems with shuffling for us has
> a lot of children and doesn't even run concurrently with any other stages
> so I ruled out the concurrency of the stages as a culprit for the
> shuffliing problem we're seeing.
>
> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjnolet@gmail.com> wrote:
>
>> Igor,
>>
>> I don't think the question is "why can't it fit stuff in memory". I know
>> why it can't fit stuff in memory- because it's a large dataset that needs
>> to have a reduceByKey() run on it. My understanding is that when it doesn't
>> fit into memory it needs to spill in order to consolidate intermediary
>> files into a single file. The more data you need to run through this, the
>> more it will need to spill. My findings is that once it gets stuck in this
>> spill chain with our dataset it's all over @ that point because it will
>> spill and spill and spill and spill and spill. If I give the shuffle enough
>> memory it won't- irrespective of the number of partitions we have (i've
>> done everything from repartition(500) to repartition(2500)). It's not a
>> matter of running out of memory on a single node because the data is
>> skewed. It's more a matter of the shuffle buffer filling up and needing to
>> spill. I think what may be happening is that it gets to a point where it's
>> spending more time reading/writing from disk while doing the spills then it
>> is actually processing any data. I can tell this because I can see that the
>> spills sometimes get up into the 10's to 100's of TB where the input data
>> was maybe acquireExecutionMemory at most. Unfortunately my code is on a
>> private internal network and I'm not able to share it.
>>
>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.berman@gmail.com>
>> wrote:
>>
>>> so can you provide code snippets: especially it's interesting to see
>>> what are your transformation chain, how many partitions are there on each
>>> side of shuffle operation
>>>
>>> the question is why it can't fit stuff in memory when you are shuffling
>>> - maybe your partitioner on "reduce" side is not configured properly? I
>>> mean if map side is ok, and you just reducing by key or something it should
>>> be ok, so some detail is missing...skewed data? aggregate by key?
>>>
>>> On 6 February 2016 at 20:13, Corey Nolet <cjnolet@gmail.com> wrote:
>>>
>>>> Igor,
>>>>
>>>> Thank you for the response but unfortunately, the problem I'm referring
>>>> to goes beyond this. I have set the shuffle memory fraction to be 90% and
>>>> set the cache memory to be 0. Repartitioning the RDD helped a tad on the
>>>> map side but didn't do much for the spilling when there was no longer any
>>>> memory left for the shuffle. Also the new auto-memory management doesn't
>>>> seem like it'll have too much of an effect after i've already given most
>>>> the memory i've allocated to the shuffle. The problem I'm having is most
>>>> specifically related to the shuffle performing declining by several orders
>>>> of magnitude when it needs to spill multiple times (it ends up spilling
>>>> several hundred for me when it can't fit stuff into memory).
>>>>
>>>>
>>>>
>>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.berman@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> usually you can solve this by 2 steps
>>>>> make rdd to have more partitions
>>>>> play with shuffle memory fraction
>>>>>
>>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted
>>>>> automatically
>>>>>
>>>>> On 5 February 2016 at 23:07, Corey Nolet <cjnolet@gmail.com> wrote:
>>>>>
>>>>>> I just recently had a discovery that my jobs were taking several
>>>>>> hours to completely because of excess shuffle spills. What I found
was that
>>>>>> when I hit the high point where I didn't have enough memory for the
>>>>>> shuffles to store all of their file consolidations at once, it could
spill
>>>>>> so many times that it causes my job's runtime to increase by orders
of
>>>>>> magnitude (and sometimes fail altogether).
>>>>>>
>>>>>> I've played with all the tuning parameters I can find. To speed the
>>>>>> shuffles up, I tuned the akka threads to different values. I also
tuned the
>>>>>> shuffle buffering a tad (both up and down).
>>>>>>
>>>>>> I feel like I see a weak point here. The mappers are sharing memory
>>>>>> space with reducers and the shuffles need enough memory to consolidate
and
>>>>>> pull otherwise they will need to spill and spill and spill. What
i've
>>>>>> noticed about my jobs is that this is a difference between them taking
30
>>>>>> minutes and 4 hours or more. Same job- just different memory tuning.
>>>>>>
>>>>>> I've found that, as a result of the spilling, I'm better off not
>>>>>> caching any data in memory and lowering my storage fraction to 0
and still
>>>>>> hoping I was able to give my shuffles enough memory that my data
doesn't
>>>>>> continuously spill. Is this the way it's supposed to be? It makes
it hard
>>>>>> because it seems like it forces the memory limits on my job- otherwise
it
>>>>>> could take orders of magnitude longer to execute.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message