spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thunder Stumpges <thunder.stump...@gmail.com>
Subject Re: RDD Partitions not distributed evenly to executors
Date Tue, 22 Nov 2016 02:19:51 GMT
Has anyone figured this out yet!? I have gone looking for this exact
problem (spark 1.6.1) and I cannot get my partitions to be distributed
evenly across executors no matter what I've tried. it has been mentioned
several other times in the user group as well as the dev group (as
mentioned by Mike Hynes initially, as well as a few others I found).

It seems this has been a known issue for the better part of 6 months. Hard
to believe it has had no real progress.

Has anyone got a work around or something that can spread the cached RDD
partitions evenly across executors?

This is having a major performance impact on my Spark Streaming application
which is extremely imbalanced currently.

Thanks in advance!
Thunder


On Wed, Apr 6, 2016 at 6:36 AM Mike Hynes <91mbbh@gmail.com> wrote:

> Hello All (and Devs in particular),
>
> Thank you again for your further responses. Please find a detailed
> email below which identifies the cause (I believe) of the partition
> imbalance problem, which occurs in spark 1.5, 1.6, and a 2.0-SNAPSHOT.
> This is followed by follow-up questions for the dev community with
> more intimate knowledge of the scheduler so that they may confirm my
> guess at the cause, and please provide insight at how best to avoid
> the problem.
>
> Attached to this email are Gantt-chart plots which show the task
> execution over elapsed time in a Spark program. This program was meant
> to investigate the simplest possible vector operation for block-vector
> data stored in RDDs of type RDD[(Int,Vector)]. In the Gantt plots,
> you'll see the tasks shown as horizontal lines along the x axis, which
> shows elapsed time. The shaded regions represent a single executor
> such that all tasks managed by a single executor lie in a contiguous
> shaded region. The executors all managed 16 cores on 4 different
> compute nodes, and the tasks have been sorted and fit into 16 slots
> for each executor according their chronological order, as determined
> by the task information in the event log for the program, such that
> the y-axis corresponds to essentially the unique core id, ranging from
> 1 to 64. The numbers running horizontally at the top of these plots is
> the stage number, as determined by the DAG scheduler.
>
> In the program itself, two block vectors, v_1 and v_2, were created
> and copartitioned, cached, and then added together elementwise through
> a join operation on their block index keys. Stages 0 and 1 correspond
> to the map and count operations to create v_1; stages 2 and 3
> correspond to the same operations on v_2; and stages 6 through 15
> consist of identical count operations to materialize the vector v =
> v_1 + v_2, formed through a join on v_1 and v_2. The vectors v_1 and
> v_2 were initialized by first creating the keys using a
> sc.parallelize{0 to num_blocks - 1} operation, after which the keys
> were partitioned with a HashPartitioner (note that first a dummy map
> {k => (k,k)} on the keys was done so that the HashPartitioner could be
> used; the motivation for this was that, for large block vector RDDs,
> it was be better to hash partition the keys before generating the
> data). The size of the vectors is determined as a multiple of a fixed
> vector block size (size of each sub-block) times the number of
> partitions, which is itself an integer multiple of the number of
> cores. Furthermore, each partition has \gamma blocks. So each
> partition has \gamma blocks; there are \alpha partitions per core, and
> each block has size 2^16.
>
> The first plot, 02_4node_imbalance_spark1.6_run2.pdf, shows a
> representative run of the block vector addition program for \alpha =
> 4, \gamma = 4. A well-balanced partitioning would correspond to 4
> partitions for core, such that each executor is managing 64 tasks.
> However, as you can see in stage 0, this does not occur: there is a
> large imbalance, where cores 46--64 have many more tasks to compute
> than the others.
>
> Observing the order of the task assignment, I believe that what is
> happening here is that, due to the initial random delay of the
> executors in responding/receiving master instructions, the driver is
> assigning more tasks to the executor whose initial wave of tasks
> finishes first. Since there is *no* data locality in stage 0 to factor
> into determining on which nodes the computation should occur, my
> understanding is that the driver will allocate the tasks
> greedily---hence the initial delay is crucial for allocating
> partitions evenly across the nodes. Furthermore, note that stage 2 (an
> identical vector initialization operation to stage 0) is
> well-balanced, since all of the executors completed tasks at
> approximately the same time, and hence without data locality being a
> factor, were assigned new tasks at the same rate. Also, note here that
> the individual task durations are *decreasing markedly* through stages
> 6--15 (again, all of which are identical), but that the stages are
> longer than need be due to the load imbalance of the tasks.
>
> The second plot, 02_4node_balance_longer.pdf, shows a second version
> of this same program. The code is identical, however the commandline
> input parameters have been changed such that there were 64 partitions
> (\alpha = 1 partition per core), an identical blocksize of 2^16, but
> \gamma = 16 blocks per partitions---i.e. fewer yet larger partitions
> such that the vector is the same size. Here, stage 0 and 2 are both
> evenly partitioned; since the tasks in these stages are longer than
> the initial executor delay, no imbalance is created. However, despite
> the better balance in partitions across the nodes, this program takes
> *longer* in total elapsed time, and the tasks do not seem to be
> getting shorter by the same proportion as in the previous test with
> more partitions.
>
> Given the above, I would like to ask the following questions:
>
> 1. Is my inference correct that the partition imbalance arises due to
> the greedy nature of the scheduler when data locality is not a factor?
>
> 2a. Why are the task times in plot 1 decreasing so dramatically, but
> not in plot 2?
> 2b. Could the decrease in time be due to just-in-time compilation?
> 2c. If so, Why would the JIT occur only for the first case with many
> partitions when the same amount of computational work is to be done in
> both cases?
>
> 3. If an RDD is to be created in such a manner (i.e. initialized for,
> say, an iterative algorithm, rather than by reading data from disk or
> hdfs), what is the best practice to promote good load balancing? My
> first idea would be to create the full RDD with 2x as many partitions
> but then coalesce it down to half the number of partitions with the
> shuffle flag set to true. Would that be reasonable?
>
> Thank you very much for your time, and I very much hope that someone
> from the dev community who is familiar with the scheduler may be able
> to clarify the above observations and questions.
>
> Thanks,
> Mike
>
> P.S. Koert Kuipers: neither spark-defaults.sh setting impacted the
> observed behaviour, but thank you kindly for your suggestions.
>
>
> On 4/5/16, Khaled Ammar <khaled.ammar@gmail.com> wrote:
> > I have a similar experience.
> >
> > Using 32 machines, I can see than number of tasks (partitions) assigned
> to
> > executors (machines) is not even. Moreover, the distribution change every
> > stage (iteration).
> >
> > I wonder why Spark needs to move partitions around any way, should not
> the
> > scheduler reduce network (and other IO) overhead by reducing such
> > relocation.
> >
> > Thanks,
> > -Khaled
> >
> >
> >
> >
> > On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers <koert@tresata.com>
> wrote:
> >
> >> can you try:
> >> spark.shuffle.reduceLocality.enabled=false
> >>
> >> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91mbbh@gmail.com> wrote:
> >>
> >>> Dear all,
> >>>
> >>> Thank you for your responses.
> >>>
> >>> Michael Slavitch:
> >>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
> >>> correctly propagated to all nodes?  Are they identical?
> >>> Yes; these files are stored on a shared memory directory accessible to
> >>> all nodes.
> >>>
> >>> Koert Kuipers:
> >>> > we ran into similar issues and it seems related to the new memory
> >>> > management. can you try:
> >>> > spark.memory.useLegacyMode = true
> >>> I reran the exact same code with a restarted cluster using this
> >>> modification, and did not observe any difference. The partitioning is
> >>> still imbalanced.
> >>>
> >>> Ted Yu:
> >>> > If the changes can be ported over to 1.6.1, do you mind reproducing
> >>> > the
> >>> issue there ?
> >>> Since the spark.memory.useLegacyMode setting did not impact my code
> >>> execution, I will have to change the Spark dependency back to earlier
> >>> versions to see if the issue persists and get back to you.
> >>>
> >>> Meanwhile, if anyone else has any other ideas or experience, please let
> >>> me know.
> >>>
> >>> Mike
> >>>
> >>> On 4/4/16, Koert Kuipers <koert@tresata.com> wrote:
> >>> > we ran into similar issues and it seems related to the new memory
> >>> > management. can you try:
> >>> > spark.memory.useLegacyMode = true
> >>> >
> >>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91mbbh@gmail.com>
wrote:
> >>> >
> >>> >> [ CC'ing dev list since nearly identical questions have occurred
in
> >>> >> user list recently w/o resolution;
> >>> >> c.f.:
> >>> >>
> >>> >>
> >>>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
> >>> >>
> >>> >>
> >>>
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> >>> >> ]
> >>> >>
> >>> >> Hello,
> >>> >>
> >>> >> In short, I'm reporting a problem concerning load imbalance of
RDD
> >>> >> partitions across a standalone cluster. Though there are 16 cores
> >>> >> available per node, certain nodes will have >16 partitions,
and some
> >>> >> will correspondingly have <16 (and even 0).
> >>> >>
> >>> >> In more detail: I am running some scalability/performance tests
for
> >>> >> vector-type operations. The RDDs I'm considering are simple block
> >>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The
RDDs
> >>> >> are generated with a fixed number of elements given by some multiple
> >>> >> of the available cores, and subsequently hash-partitioned by their
> >>> >> integer block index.
> >>> >>
> >>> >> I have verified that the hash partitioning key distribution, as
well
> >>> >> as the keys themselves, are both correct; the problem is truly
that
> >>> >> the partitions are *not* evenly distributed across the nodes.
> >>> >>
> >>> >> For instance, here is a representative output for some stages and
> >>> >> tasks in an iterative program. This is a very simple test with
2
> >>> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors.
Two
> >>> >> examples stages from the stderr log are stages 7 and 9:
> >>> >> 7,mapPartitions at
> >>> >> DummyVector.scala:113,64,1459771364404,1459771365272
> >>> >> 9,mapPartitions at
> >>> >> DummyVector.scala:113,64,1459771364431,1459771365639
> >>> >>
> >>> >> When counting the location of the partitions on the compute nodes
> >>> >> from
> >>> >> the stderr logs, however, you can clearly see the imbalance.
> Examples
> >>> >> lines are:
> >>> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage
7.0 (TID 196,
> >>> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> >>> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage
7.0 (TID 197,
> >>> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> >>> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage
7.0 (TID 198,
> >>> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
> >>> >>
> >>> >> Grep'ing the full set of above lines for each hostname, himrod-?,
> >>> >> shows the problem occurs in each stage. Below is the output, where
> >>> >> the
> >>> >> number of partitions stored on each node is given alongside its
> >>> >> hostname as in (himrod-?,num_partitions):
> >>> >> Stage 7: (himrod-1,0) (himrod-2,64)
> >>> >> Stage 9: (himrod-1,16) (himrod-2,48)
> >>> >> Stage 12: (himrod-1,0) (himrod-2,64)
> >>> >> Stage 14: (himrod-1,16) (himrod-2,48)
> >>> >> The imbalance is also visible when the executor ID is used to count
> >>> >> the partitions operated on by executors.
> >>> >>
> >>> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT
> >>> >> branch
> >>> >> (but the modifications do not touch the scheduler, and are
> irrelevant
> >>> >> for these particular tests). Has something changed radically in
1.6+
> >>> >> that would make a previously (<=1.5) correct configuration go
> >>> >> haywire?
> >>> >> Have new configuration settings been added of which I'm unaware
that
> >>> >> could lead to this problem?
> >>> >>
> >>> >> Please let me know if others in the community have observed this,
> and
> >>> >> thank you for your time,
> >>> >> Mike
> >>> >>
> >>> >>
> ---------------------------------------------------------------------
> >>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>> >> For additional commands, e-mail: user-help@spark.apache.org
> >>> >>
> >>> >>
> >>> >
> >>>
> >>>
> >>> --
> >>> Thanks,
> >>> Mike
> >>>
> >>
> >>
> >
> >
> > --
> > Thanks,
> > -Khaled
> >
>
>
> --
> Thanks,
> Mike
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org

Mime
View raw message