spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sea" <261810...@qq.com>
Subject 回复: Shuffle memory woes
Date Sun, 07 Feb 2016 14:28:15 GMT
Hi,Corey:
   "The dataset is 100gb at most, the spills can up to 10T-100T", Are your input files lzo
format, and you use sc.text() ? If memory is not enough, spark will spill 3-4x of input data
to disk.




------------------ 原始邮件 ------------------
发件人: "Corey Nolet";<cjnolet@gmail.com>;
发送时间: 2016年2月7日(星期天) 晚上8:56
收件人: "Igor Berman"<igor.berman@gmail.com>; 
抄送: "user"<user@spark.apache.org>; 
主题: Re: Shuffle memory woes



As for the second part of your questions- we have a fairly complex join process which requires
a ton of stage orchestration from our driver. I've written some code to be able to walk down
our DAG tree and execute siblings in the tree concurrently where possible (forcing cache to
disk on children that that have multiple chiildren themselves so that they can be run concurrently).
Ultimatey, we have seen significant speedup in our jobs by keeping tasks as busy as possible
processing concurrent stages. Funny enough though, the stage that is causing problems with
shuffling for us has a lot of children and doesn't even run concurrently with any other stages
so I ruled out the concurrency of the stages as a culprit for the shuffliing problem we're
seeing.

On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjnolet@gmail.com> wrote:
Igor,

I don't think the question is "why can't it fit stuff in memory". I know why it can't fit
stuff in memory- because it's a large dataset that needs to have a reduceByKey() run on it.
My understanding is that when it doesn't fit into memory it needs to spill in order to consolidate
intermediary files into a single file. The more data you need to run through this, the more
it will need to spill. My findings is that once it gets stuck in this spill chain with our
dataset it's all over @ that point because it will spill and spill and spill and spill and
spill. If I give the shuffle enough memory it won't- irrespective of the number of partitions
we have (i've done everything from repartition(500) to repartition(2500)). It's not a matter
of running out of memory on a single node because the data is skewed. It's more a matter of
the shuffle buffer filling up and needing to spill. I think what may be happening is that
it gets to a point where it's spending more time reading/writing from disk while doing the
spills then it is actually processing any data. I can tell this because I can see that the
spills sometimes get up into the 10's to 100's of TB where the input data was maybe acquireExecutionMemory
at most. Unfortunately my code is on a private internal network and I'm not able to share
it. 


On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.berman@gmail.com> wrote:
so can you provide code snippets: especially it's interesting to see what are your transformation
chain, how many partitions are there on each side of shuffle operation

the question is why it can't fit stuff in memory when you are shuffling - maybe your partitioner
on "reduce" side is not configured properly? I mean if map side is ok, and you just reducing
by key or something it should be ok, so some detail is missing...skewed data? aggregate by
key?


On 6 February 2016 at 20:13, Corey Nolet <cjnolet@gmail.com> wrote:
Igor,

Thank you for the response but unfortunately, the problem I'm referring to goes beyond this.
I have set the shuffle memory fraction to be 90% and set the cache memory to be 0. Repartitioning
the RDD helped a tad on the map side but didn't do much for the spilling when there was no
longer any memory left for the shuffle. Also the new auto-memory management doesn't seem like
it'll have too much of an effect after i've already given most the memory i've allocated to
the shuffle. The problem I'm having is most specifically related to the shuffle performing
declining by several orders of magnitude when it needs to spill multiple times (it ends up
spilling several hundred for me when it can't fit stuff into memory).






On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.berman@gmail.com> wrote:
Hi,usually you can solve this by 2 steps
make rdd to have more partitions
play with shuffle memory fraction


in spark 1.6 cache vs shuffle memory fractions are adjusted automatically


On 5 February 2016 at 23:07, Corey Nolet <cjnolet@gmail.com> wrote:
I just recently had a discovery that my jobs were taking several hours to completely because
of excess shuffle spills. What I found was that when I hit the high point where I didn't have
enough memory for the shuffles to store all of their file consolidations at once, it could
spill so many times that it causes my job's runtime to increase by orders of magnitude (and
sometimes fail altogether).


I've played with all the tuning parameters I can find. To speed the shuffles up, I tuned the
akka threads to different values. I also tuned the shuffle buffering a tad (both up and down).



I feel like I see a weak point here. The mappers are sharing memory space with reducers and
the shuffles need enough memory to consolidate and pull otherwise they will need to spill
and spill and spill. What i've noticed about my jobs is that this is a difference between
them taking 30 minutes and 4 hours or more. Same job- just different memory tuning.


I've found that, as a result of the spilling, I'm better off not caching any data in memory
and lowering my storage fraction to 0 and still hoping I was able to give my shuffles enough
memory that my data doesn't continuously spill. Is this the way it's supposed to be? It makes
it hard because it seems like it forces the memory limits on my job- otherwise it could take
orders of magnitude longer to execute.
Mime
View raw message