spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Ash <and...@andrewash.com>
Subject Re: Issue with sortByKey.
Date Fri, 03 Jan 2014 15:32:31 GMT
It probably uses hashcode too so make sure those two methods are in sync

Sent from my mobile phone
On Jan 3, 2014 3:26 AM, "Archit Thakur" <archit279thakur@gmail.com> wrote:

> I realized my mistake as soon as I posted it. I actually meant groupByKey
> not sortedByKey. And Yeah you are right, it is consuming 6 Hdfs blocks.
>
> The issue I am facing is When I do a groupBy, it reduces the no. of unique
> keys in the Rdd and modify them also.
>
> For eg:
>
> I have a custom DS.
>
> Below is the set of unique keys in the baseRdd
>
> (40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]]
> (40^0^0[2^1380^0]6[2[18^71.68.211.98:62510][2^WP]]
> (40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]]
> (40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]]
> (40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]]
> (40^0^0[2^1380^1383838476]6[2[18^71.68.211.98:62498][2^WP]]
> (40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]]
> (40^0^0[2^1380^1383839099]6[2[19^71.75.156.224:52842][2^WP]]
> (40^0^0[2^1380^1383839119]6[2[19^128.211.178.8:33448][2^WP]]
> (40^0^0[2^1380^1383839294]6[2[19^71.75.156.224:36652][2^WP]]
> (40^0^0[2^1380^1383839651]6[2[18^69.133.71.57:58320][2^WP]]
> (43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]]
>
>
> and when I do a groupBy on the Rdd, it gives me:
>
> (40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]]
> (40^0^0[2^1380^0]6[2[18^96.27.139.59:49412][2^WP]]
> (40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]]
> (40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]]
> (40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]]
> (40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]]
> (40^0^0[2^1380^1383839099]6[2[19^71.75.156.224:52842][2^WP]]
> (43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]]
>
>
> Not only it has reduced the no. of keys but also have modified it.
>
> groupBy operation only uses equals method of the Key class (to check the
> equality of the key), right?
>
>
> On Fri, Jan 3, 2014 at 4:02 PM, Andrew Ash <andrew@andrewash.com> wrote:
>
>> Hi Archit,
>>
>> A partition is a chunk of data about the size of an HDFS block, not that
>> of a single key.  Because every partition is tracked individually and each
>> is processed in a task on one CPU core, having massive numbers of them
>> causes slowdowns in the scheduler and elsewhere in the system.  About how
>> much data are you looking at here?  If the source of your RDDs are in HDFS,
>> then how many HDFS blocks are required to hold the 6 RDDs?
>>
>> Andrew
>>
>>
>> On Fri, Jan 3, 2014 at 5:12 AM, Archit Thakur <archit279thakur@gmail.com>wrote:
>>
>>> I saw Code of sortByKey:
>>>
>>> def sortByKey(ascending: Boolean = true, numPartitions: Int =
>>> self.partitions.size): RDD[P] = {
>>>
>>> It makes numPartitions = self.partitions.size which comes from
>>> getPartitions method of RDD, if you dont specify it explicitly.
>>>
>>> In this case it will be rdd which will be created by step (3rd). Isn't
>>> it wrong?
>>>
>>>
>>> On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur <archit279thakur@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> I have 6 sequence files as input to spark code.
>>>> What I am doing is:
>>>> 1. Create 6 individual RDD's out of them.
>>>> 2. Union them.
>>>> 3. Then Some Mapping.
>>>> 4. Count no of ele in RDD.
>>>> 5. Then SortByKey.
>>>>
>>>> Now, If I see logging:
>>>>
>>>> 14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at
>>>> PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false)
>>>>
>>>> This is count step (4th)
>>>>
>>>> Doubt 1: Why 6 output partitions?
>>>>
>>>> It then prints progress for each of them
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *14/01/03 09:04:05 INFO
>>>> storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager
>>>> guavus-000392:52345 with 47.4 GB RAM 14/01/03 09:04:08 INFO
>>>> cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on guavus-000392
>>>> (progress: 1/6)14/01/03 09:04:08 INFO scheduler.DAGScheduler: Completed
>>>> ResultTask(0, 5)14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager:
>>>> Finished TID 4 in 4211 ms on guavus-000392 (progress: 2/6) 14/01/03
>>>> 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 4)14/01/03
>>>> 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 1 in 4221 ms on
>>>> guavus-000392 (progress: 3/6)14/01/03 09:04:09 INFO scheduler.DAGScheduler:
>>>> Completed ResultTask(0, 1) 14/01/03 09:04:10 INFO
>>>> cluster.ClusterTaskSetManager: Finished TID 0 in 5581 ms on guavus-000392
>>>> (progress: 4/6)14/01/03 09:04:10 INFO scheduler.DAGScheduler: Completed
>>>> ResultTask(0, 0)14/01/03 09:04:12 INFO cluster.ClusterTaskSetManager:
>>>> Finished TID 3 in 7830 ms on guavus-000392 (progress: 5/6) 14/01/03
>>>> 09:04:12 INFO scheduler.DAGScheduler: Completed ResultTask(0, 3)14/01/03
>>>> 09:04:20 INFO cluster.ClusterTaskSetManager: Finished TID 2 in 15786 ms on
>>>> guavus-000392 (progress: 6/6)14/01/03 09:04:20 INFO scheduler.DAGScheduler:
>>>> Completed ResultTask(0, 2) 14/01/03 09:04:20 INFO scheduler.DAGScheduler:
>>>> Stage 0 (count at PreBaseCubeCreator.scala:96) finished in 16.320 s14/01/03
>>>> 09:04:20 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from
>>>> pool14/01/03 09:04:20 INFO spark.SparkContext: Job finished: count*
>>>>
>>>> After that when it goes to sortByKey:
>>>>
>>>> *14/01/03 09:04:33 INFO scheduler.DAGScheduler: Got job 2 (sortByKey at
>>>> PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)*
>>>>
>>>> However, It should have been n output partitions, where n = unique no.
>>>> of keys in RDD. Isn't it?
>>>>
>>>> Thanks and Regards,
>>>> Archit Thakur.
>>>>
>>>
>>>
>>
>

Mime
View raw message