spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Ash <and...@andrewash.com>
Subject Re: Issue with sortByKey.
Date Fri, 03 Jan 2014 10:32:33 GMT
Hi Archit,

A partition is a chunk of data about the size of an HDFS block, not that of
a single key.  Because every partition is tracked individually and each is
processed in a task on one CPU core, having massive numbers of them causes
slowdowns in the scheduler and elsewhere in the system.  About how much
data are you looking at here?  If the source of your RDDs are in HDFS, then
how many HDFS blocks are required to hold the 6 RDDs?

Andrew


On Fri, Jan 3, 2014 at 5:12 AM, Archit Thakur <archit279thakur@gmail.com>wrote:

> I saw Code of sortByKey:
>
> def sortByKey(ascending: Boolean = true, numPartitions: Int =
> self.partitions.size): RDD[P] = {
>
> It makes numPartitions = self.partitions.size which comes from
> getPartitions method of RDD, if you dont specify it explicitly.
>
> In this case it will be rdd which will be created by step (3rd). Isn't it
> wrong?
>
>
> On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur <archit279thakur@gmail.com>wrote:
>
>> Hi,
>>
>> I have 6 sequence files as input to spark code.
>> What I am doing is:
>> 1. Create 6 individual RDD's out of them.
>> 2. Union them.
>> 3. Then Some Mapping.
>> 4. Count no of ele in RDD.
>> 5. Then SortByKey.
>>
>> Now, If I see logging:
>>
>> 14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at
>> PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false)
>>
>> This is count step (4th)
>>
>> Doubt 1: Why 6 output partitions?
>>
>> It then prints progress for each of them
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *14/01/03 09:04:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
>> Registering block manager guavus-000392:52345 with 47.4 GB RAM 14/01/03
>> 09:04:08 INFO cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on
>> guavus-000392 (progress: 1/6)14/01/03 09:04:08 INFO scheduler.DAGScheduler:
>> Completed ResultTask(0, 5)14/01/03 09:04:09 INFO
>> cluster.ClusterTaskSetManager: Finished TID 4 in 4211 ms on guavus-000392
>> (progress: 2/6) 14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed
>> ResultTask(0, 4)14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager:
>> Finished TID 1 in 4221 ms on guavus-000392 (progress: 3/6)14/01/03 09:04:09
>> INFO scheduler.DAGScheduler: Completed ResultTask(0, 1) 14/01/03 09:04:10
>> INFO cluster.ClusterTaskSetManager: Finished TID 0 in 5581 ms on
>> guavus-000392 (progress: 4/6)14/01/03 09:04:10 INFO scheduler.DAGScheduler:
>> Completed ResultTask(0, 0)14/01/03 09:04:12 INFO
>> cluster.ClusterTaskSetManager: Finished TID 3 in 7830 ms on guavus-000392
>> (progress: 5/6) 14/01/03 09:04:12 INFO scheduler.DAGScheduler: Completed
>> ResultTask(0, 3)14/01/03 09:04:20 INFO cluster.ClusterTaskSetManager:
>> Finished TID 2 in 15786 ms on guavus-000392 (progress: 6/6)14/01/03
>> 09:04:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 2) 14/01/03
>> 09:04:20 INFO scheduler.DAGScheduler: Stage 0 (count at
>> PreBaseCubeCreator.scala:96) finished in 16.320 s14/01/03 09:04:20 INFO
>> cluster.ClusterScheduler: Remove TaskSet 0.0 from pool14/01/03 09:04:20
>> INFO spark.SparkContext: Job finished: count*
>>
>> After that when it goes to sortByKey:
>>
>> *14/01/03 09:04:33 INFO scheduler.DAGScheduler: Got job 2 (sortByKey at
>> PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)*
>>
>> However, It should have been n output partitions, where n = unique no. of
>> keys in RDD. Isn't it?
>>
>> Thanks and Regards,
>> Archit Thakur.
>>
>
>

Mime
View raw message