spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Archit Thakur <archit279tha...@gmail.com>
Subject Re: Issue with sortByKey.
Date Fri, 03 Jan 2014 11:25:49 GMT
I realized my mistake as soon as I posted it. I actually meant groupByKey
not sortedByKey. And Yeah you are right, it is consuming 6 Hdfs blocks.

The issue I am facing is When I do a groupBy, it reduces the no. of unique
keys in the Rdd and modify them also.

For eg:

I have a custom DS.

Below is the set of unique keys in the baseRdd

(40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]]
(40^0^0[2^1380^0]6[2[18^71.68.211.98:62510][2^WP]]
(40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]]
(40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]]
(40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]]
(40^0^0[2^1380^1383838476]6[2[18^71.68.211.98:62498][2^WP]]
(40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]]
(40^0^0[2^1380^1383839099]6[2[19^71.75.156.224:52842][2^WP]]
(40^0^0[2^1380^1383839119]6[2[19^128.211.178.8:33448][2^WP]]
(40^0^0[2^1380^1383839294]6[2[19^71.75.156.224:36652][2^WP]]
(40^0^0[2^1380^1383839651]6[2[18^69.133.71.57:58320][2^WP]]
(43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]]


and when I do a groupBy on the Rdd, it gives me:

(40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]]
(40^0^0[2^1380^0]6[2[18^96.27.139.59:49412][2^WP]]
(40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]]
(40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]]
(40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]]
(40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]]
(40^0^0[2^1380^1383839099]6[2[19^71.75.156.224:52842][2^WP]]
(43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]]


Not only it has reduced the no. of keys but also have modified it.

groupBy operation only uses equals method of the Key class (to check the
equality of the key), right?


On Fri, Jan 3, 2014 at 4:02 PM, Andrew Ash <andrew@andrewash.com> wrote:

> Hi Archit,
>
> A partition is a chunk of data about the size of an HDFS block, not that
> of a single key.  Because every partition is tracked individually and each
> is processed in a task on one CPU core, having massive numbers of them
> causes slowdowns in the scheduler and elsewhere in the system.  About how
> much data are you looking at here?  If the source of your RDDs are in HDFS,
> then how many HDFS blocks are required to hold the 6 RDDs?
>
> Andrew
>
>
> On Fri, Jan 3, 2014 at 5:12 AM, Archit Thakur <archit279thakur@gmail.com>wrote:
>
>> I saw Code of sortByKey:
>>
>> def sortByKey(ascending: Boolean = true, numPartitions: Int =
>> self.partitions.size): RDD[P] = {
>>
>> It makes numPartitions = self.partitions.size which comes from
>> getPartitions method of RDD, if you dont specify it explicitly.
>>
>> In this case it will be rdd which will be created by step (3rd). Isn't it
>> wrong?
>>
>>
>> On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur <archit279thakur@gmail.com>wrote:
>>
>>> Hi,
>>>
>>> I have 6 sequence files as input to spark code.
>>> What I am doing is:
>>> 1. Create 6 individual RDD's out of them.
>>> 2. Union them.
>>> 3. Then Some Mapping.
>>> 4. Count no of ele in RDD.
>>> 5. Then SortByKey.
>>>
>>> Now, If I see logging:
>>>
>>> 14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at
>>> PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false)
>>>
>>> This is count step (4th)
>>>
>>> Doubt 1: Why 6 output partitions?
>>>
>>> It then prints progress for each of them
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *14/01/03 09:04:05 INFO
>>> storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager
>>> guavus-000392:52345 with 47.4 GB RAM 14/01/03 09:04:08 INFO
>>> cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on guavus-000392
>>> (progress: 1/6)14/01/03 09:04:08 INFO scheduler.DAGScheduler: Completed
>>> ResultTask(0, 5)14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager:
>>> Finished TID 4 in 4211 ms on guavus-000392 (progress: 2/6) 14/01/03
>>> 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 4)14/01/03
>>> 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 1 in 4221 ms on
>>> guavus-000392 (progress: 3/6)14/01/03 09:04:09 INFO scheduler.DAGScheduler:
>>> Completed ResultTask(0, 1) 14/01/03 09:04:10 INFO
>>> cluster.ClusterTaskSetManager: Finished TID 0 in 5581 ms on guavus-000392
>>> (progress: 4/6)14/01/03 09:04:10 INFO scheduler.DAGScheduler: Completed
>>> ResultTask(0, 0)14/01/03 09:04:12 INFO cluster.ClusterTaskSetManager:
>>> Finished TID 3 in 7830 ms on guavus-000392 (progress: 5/6) 14/01/03
>>> 09:04:12 INFO scheduler.DAGScheduler: Completed ResultTask(0, 3)14/01/03
>>> 09:04:20 INFO cluster.ClusterTaskSetManager: Finished TID 2 in 15786 ms on
>>> guavus-000392 (progress: 6/6)14/01/03 09:04:20 INFO scheduler.DAGScheduler:
>>> Completed ResultTask(0, 2) 14/01/03 09:04:20 INFO scheduler.DAGScheduler:
>>> Stage 0 (count at PreBaseCubeCreator.scala:96) finished in 16.320 s14/01/03
>>> 09:04:20 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from
>>> pool14/01/03 09:04:20 INFO spark.SparkContext: Job finished: count*
>>>
>>> After that when it goes to sortByKey:
>>>
>>> *14/01/03 09:04:33 INFO scheduler.DAGScheduler: Got job 2 (sortByKey at
>>> PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)*
>>>
>>> However, It should have been n output partitions, where n = unique no.
>>> of keys in RDD. Isn't it?
>>>
>>> Thanks and Regards,
>>> Archit Thakur.
>>>
>>
>>
>

Mime
View raw message