spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Hynes <91m...@gmail.com>
Subject Re: RDD Partitions not distributed evenly to executors
Date Wed, 06 Apr 2016 13:36:00 GMT
Hello All (and Devs in particular),

Thank you again for your further responses. Please find a detailed
email below which identifies the cause (I believe) of the partition
imbalance problem, which occurs in spark 1.5, 1.6, and a 2.0-SNAPSHOT.
This is followed by follow-up questions for the dev community with
more intimate knowledge of the scheduler so that they may confirm my
guess at the cause, and please provide insight at how best to avoid
the problem.

Attached to this email are Gantt-chart plots which show the task
execution over elapsed time in a Spark program. This program was meant
to investigate the simplest possible vector operation for block-vector
data stored in RDDs of type RDD[(Int,Vector)]. In the Gantt plots,
you'll see the tasks shown as horizontal lines along the x axis, which
shows elapsed time. The shaded regions represent a single executor
such that all tasks managed by a single executor lie in a contiguous
shaded region. The executors all managed 16 cores on 4 different
compute nodes, and the tasks have been sorted and fit into 16 slots
for each executor according their chronological order, as determined
by the task information in the event log for the program, such that
the y-axis corresponds to essentially the unique core id, ranging from
1 to 64. The numbers running horizontally at the top of these plots is
the stage number, as determined by the DAG scheduler.

In the program itself, two block vectors, v_1 and v_2, were created
and copartitioned, cached, and then added together elementwise through
a join operation on their block index keys. Stages 0 and 1 correspond
to the map and count operations to create v_1; stages 2 and 3
correspond to the same operations on v_2; and stages 6 through 15
consist of identical count operations to materialize the vector v =
v_1 + v_2, formed through a join on v_1 and v_2. The vectors v_1 and
v_2 were initialized by first creating the keys using a
sc.parallelize{0 to num_blocks - 1} operation, after which the keys
were partitioned with a HashPartitioner (note that first a dummy map
{k => (k,k)} on the keys was done so that the HashPartitioner could be
used; the motivation for this was that, for large block vector RDDs,
it was be better to hash partition the keys before generating the
data). The size of the vectors is determined as a multiple of a fixed
vector block size (size of each sub-block) times the number of
partitions, which is itself an integer multiple of the number of
cores. Furthermore, each partition has \gamma blocks. So each
partition has \gamma blocks; there are \alpha partitions per core, and
each block has size 2^16.

The first plot, 02_4node_imbalance_spark1.6_run2.pdf, shows a
representative run of the block vector addition program for \alpha =
4, \gamma = 4. A well-balanced partitioning would correspond to 4
partitions for core, such that each executor is managing 64 tasks.
However, as you can see in stage 0, this does not occur: there is a
large imbalance, where cores 46--64 have many more tasks to compute
than the others.

Observing the order of the task assignment, I believe that what is
happening here is that, due to the initial random delay of the
executors in responding/receiving master instructions, the driver is
assigning more tasks to the executor whose initial wave of tasks
finishes first. Since there is *no* data locality in stage 0 to factor
into determining on which nodes the computation should occur, my
understanding is that the driver will allocate the tasks
greedily---hence the initial delay is crucial for allocating
partitions evenly across the nodes. Furthermore, note that stage 2 (an
identical vector initialization operation to stage 0) is
well-balanced, since all of the executors completed tasks at
approximately the same time, and hence without data locality being a
factor, were assigned new tasks at the same rate. Also, note here that
the individual task durations are *decreasing markedly* through stages
6--15 (again, all of which are identical), but that the stages are
longer than need be due to the load imbalance of the tasks.

The second plot, 02_4node_balance_longer.pdf, shows a second version
of this same program. The code is identical, however the commandline
input parameters have been changed such that there were 64 partitions
(\alpha = 1 partition per core), an identical blocksize of 2^16, but
\gamma = 16 blocks per partitions---i.e. fewer yet larger partitions
such that the vector is the same size. Here, stage 0 and 2 are both
evenly partitioned; since the tasks in these stages are longer than
the initial executor delay, no imbalance is created. However, despite
the better balance in partitions across the nodes, this program takes
*longer* in total elapsed time, and the tasks do not seem to be
getting shorter by the same proportion as in the previous test with
more partitions.

Given the above, I would like to ask the following questions:

1. Is my inference correct that the partition imbalance arises due to
the greedy nature of the scheduler when data locality is not a factor?

2a. Why are the task times in plot 1 decreasing so dramatically, but
not in plot 2?
2b. Could the decrease in time be due to just-in-time compilation?
2c. If so, Why would the JIT occur only for the first case with many
partitions when the same amount of computational work is to be done in
both cases?

3. If an RDD is to be created in such a manner (i.e. initialized for,
say, an iterative algorithm, rather than by reading data from disk or
hdfs), what is the best practice to promote good load balancing? My
first idea would be to create the full RDD with 2x as many partitions
but then coalesce it down to half the number of partitions with the
shuffle flag set to true. Would that be reasonable?

Thank you very much for your time, and I very much hope that someone
from the dev community who is familiar with the scheduler may be able
to clarify the above observations and questions.

Thanks,
Mike

P.S. Koert Kuipers: neither spark-defaults.sh setting impacted the
observed behaviour, but thank you kindly for your suggestions.


On 4/5/16, Khaled Ammar <khaled.ammar@gmail.com> wrote:
> I have a similar experience.
>
> Using 32 machines, I can see than number of tasks (partitions) assigned to
> executors (machines) is not even. Moreover, the distribution change every
> stage (iteration).
>
> I wonder why Spark needs to move partitions around any way, should not the
> scheduler reduce network (and other IO) overhead by reducing such
> relocation.
>
> Thanks,
> -Khaled
>
>
>
>
> On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers <koert@tresata.com> wrote:
>
>> can you try:
>> spark.shuffle.reduceLocality.enabled=false
>>
>> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91mbbh@gmail.com> wrote:
>>
>>> Dear all,
>>>
>>> Thank you for your responses.
>>>
>>> Michael Slavitch:
>>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
>>> correctly propagated to all nodes?  Are they identical?
>>> Yes; these files are stored on a shared memory directory accessible to
>>> all nodes.
>>>
>>> Koert Kuipers:
>>> > we ran into similar issues and it seems related to the new memory
>>> > management. can you try:
>>> > spark.memory.useLegacyMode = true
>>> I reran the exact same code with a restarted cluster using this
>>> modification, and did not observe any difference. The partitioning is
>>> still imbalanced.
>>>
>>> Ted Yu:
>>> > If the changes can be ported over to 1.6.1, do you mind reproducing
>>> > the
>>> issue there ?
>>> Since the spark.memory.useLegacyMode setting did not impact my code
>>> execution, I will have to change the Spark dependency back to earlier
>>> versions to see if the issue persists and get back to you.
>>>
>>> Meanwhile, if anyone else has any other ideas or experience, please let
>>> me know.
>>>
>>> Mike
>>>
>>> On 4/4/16, Koert Kuipers <koert@tresata.com> wrote:
>>> > we ran into similar issues and it seems related to the new memory
>>> > management. can you try:
>>> > spark.memory.useLegacyMode = true
>>> >
>>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91mbbh@gmail.com> wrote:
>>> >
>>> >> [ CC'ing dev list since nearly identical questions have occurred in
>>> >> user list recently w/o resolution;
>>> >> c.f.:
>>> >>
>>> >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>>> >>
>>> >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>>> >> ]
>>> >>
>>> >> Hello,
>>> >>
>>> >> In short, I'm reporting a problem concerning load imbalance of RDD
>>> >> partitions across a standalone cluster. Though there are 16 cores
>>> >> available per node, certain nodes will have >16 partitions, and some
>>> >> will correspondingly have <16 (and even 0).
>>> >>
>>> >> In more detail: I am running some scalability/performance tests for
>>> >> vector-type operations. The RDDs I'm considering are simple block
>>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>>> >> are generated with a fixed number of elements given by some multiple
>>> >> of the available cores, and subsequently hash-partitioned by their
>>> >> integer block index.
>>> >>
>>> >> I have verified that the hash partitioning key distribution, as well
>>> >> as the keys themselves, are both correct; the problem is truly that
>>> >> the partitions are *not* evenly distributed across the nodes.
>>> >>
>>> >> For instance, here is a representative output for some stages and
>>> >> tasks in an iterative program. This is a very simple test with 2
>>> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>>> >> examples stages from the stderr log are stages 7 and 9:
>>> >> 7,mapPartitions at
>>> >> DummyVector.scala:113,64,1459771364404,1459771365272
>>> >> 9,mapPartitions at
>>> >> DummyVector.scala:113,64,1459771364431,1459771365639
>>> >>
>>> >> When counting the location of the partitions on the compute nodes
>>> >> from
>>> >> the stderr logs, however, you can clearly see the imbalance. Examples
>>> >> lines are:
>>> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0
(TID 196,
>>> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>>> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0
(TID 197,
>>> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>>> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0
(TID 198,
>>> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>>> >>
>>> >> Grep'ing the full set of above lines for each hostname, himrod-?,
>>> >> shows the problem occurs in each stage. Below is the output, where
>>> >> the
>>> >> number of partitions stored on each node is given alongside its
>>> >> hostname as in (himrod-?,num_partitions):
>>> >> Stage 7: (himrod-1,0) (himrod-2,64)
>>> >> Stage 9: (himrod-1,16) (himrod-2,48)
>>> >> Stage 12: (himrod-1,0) (himrod-2,64)
>>> >> Stage 14: (himrod-1,16) (himrod-2,48)
>>> >> The imbalance is also visible when the executor ID is used to count
>>> >> the partitions operated on by executors.
>>> >>
>>> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT
>>> >> branch
>>> >> (but the modifications do not touch the scheduler, and are irrelevant
>>> >> for these particular tests). Has something changed radically in 1.6+
>>> >> that would make a previously (<=1.5) correct configuration go
>>> >> haywire?
>>> >> Have new configuration settings been added of which I'm unaware that
>>> >> could lead to this problem?
>>> >>
>>> >> Please let me know if others in the community have observed this, and
>>> >> thank you for your time,
>>> >> Mike
>>> >>
>>> >> ---------------------------------------------------------------------
>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>> >>
>>> >>
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Mike
>>>
>>
>>
>
>
> --
> Thanks,
> -Khaled
>


-- 
Thanks,
Mike

Mime
View raw message