spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andras Nemeth <andras.nem...@lynxanalytics.com>
Subject Re: understanding stages
Date Thu, 24 Apr 2014 10:26:25 GMT
On Thu, Apr 24, 2014 at 3:44 AM, Cheng Lian <lian.cs.zju@gmail.com> wrote:

> Sorting in Spark is somewhat similar to TeraSort: sortByKey uses
> RangePartitioner to shuffle data into roughly sorted ranges first, and
> RangePartitioner runs a separate job to sample the data to figure out a
> rough range distribution. So there are actually three stages generated
> sortByKey:
>
>    1. the stage in the lightweight sample job,
>    2. mapper side stage of the shuffle, and
>    3. the reducer side of the shuffle.
>
> That's why there are three sortByKey stages appear in the web UI. Please
> refer to OrderedRDDFunctions.scala and RangePartitioner.rangeBounds in
> Partitioner.scala for details.
>
This makes sense, but on #3 above, the reducer side of the shuffle,
shouldn't that be pipelined together with the take into a single stage?

Thanks,
Andras


>
>
> On Thu, Apr 24, 2014 at 3:06 AM, Diana Carroll <dcarroll@cloudera.com>wrote:
>
>> Hello awesome Sparksters.
>>
>> I'm trying to gain a greater understanding of how stages are calculated
>> by looking at the Spark App UI Stages tab, but I'm seeing things I don't
>> really get.
>>
>> Here's what my sample code is doing:
>> - read in 66 XML files into an RDD (66 partitions)
>> - use mapPartition to parse each file into several XML elements
>> - use map to extract the one piece of data I'm looking for out of each
>> element
>> - use map and reduce to count up how many of each item I have
>> - sort the results to show the most common items
>>
>> activations = sc.textFile(filename)
>> activationTrees = activations.mapPartitions(lambda xml:
>> getactivations(xml))
>> models = activationTrees.map(lambda activation: getmodel(activation))
>> topmodels = models.map(lambda model: (model,1))\
>>     .reduceByKey(lambda v1,v2: v1+v2)\
>>     .map(lambda (model,count): (count,model))\
>>     .sortByKey(ascending=False)
>> for (count,model) in topmodels.take(n):
>>     print "Model %s (%s)" % (model,count)
>>
>> I expected when I looked at the executed stages to see three stages:
>> reduceByKey (on 66 partitions)
>> sortByKey (on some smaller number of partitions)
>> take
>>
>> What I actually see is:
>>
>> [image: Inline image 1]
>>
>> That is, sortByKey appears to be running three times.
>>
>> Why is that?
>>
>> Thanks,
>> Diana
>>
>
>

Mime
View raw message