spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Takeshi Yamamuro <linguin....@gmail.com>
Subject Re: Splitting RDD to exact number of partitions
Date Tue, 31 May 2016 14:11:54 GMT
`coalesce` without shuffling can only set  fewer partitions than its parent
RDD.
As Ted said, you need to set true in shuffle, or you need to use
`RDD#repartition`.

// maropu

On Tue, May 31, 2016 at 11:02 PM, Ted Yu <yuzhihong@gmail.com> wrote:

> Value for shuffle is false by default.
>
> Have you tried setting it to true ?
>
> Which Spark release are you using ?
>
> On Tue, May 31, 2016 at 6:13 AM, Maciej Sokołowski <matemaciek@gmail.com>
> wrote:
>
>> Hello Spark users and developers.
>>
>> I read file and want to ensure that it has exact number of partitions,
>> for example 128.
>>
>> In documentation I found:
>>
>> def textFile(path: String, minPartitions: Int = defaultMinPartitions):
>> RDD[String]
>>
>> But argument here is minimal number of partitions, so I use coalesce to
>> ensure desired number of partitions:
>>
>> def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord:
>> Ordering[T] = null): RDD[T]
>> //Return a new RDD that is reduced into numPartitions partitions.
>>
>> So I combine them and get number of partitions lower than expected:
>>
>> scala> sc.textFile("perf_test1.csv",
>> minPartitions=128).coalesce(128).getNumPartitions
>> res14: Int = 126
>>
>> Is this expected behaviour? File contains 100000 lines, size of
>> partitions before and after coalesce:
>>
>> scala> sc.textFile("perf_test1.csv",
>> minPartitions=128).mapPartitions{rows => Iterator(rows.length)}.collect()
>> res16: Array[Int] = Array(782, 781, 782, 781, 781, 782, 781, 781, 781,
>> 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781,
>> 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782,
>> 781, 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782,
>> 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781,
>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781,
>> 781, 782, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>> 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781,
>> 781, 782, 781, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781)
>>
>> scala> sc.textFile("perf_test1.csv",
>> minPartitions=128).coalesce(128).mapPartitions{rows =>
>> Iterator(rows.length)}.collect()
>> res15: Array[Int] = Array(1563, 781, 781, 781, 782, 781, 781, 781, 781,
>> 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 781, 782, 781, 781, 781,
>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>> 781, 782, 781, 781, 781, 781, 1563, 782, 781, 781, 782, 781, 781, 781, 781,
>> 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781,
>> 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781,
>> 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>> 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 782,
>> 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782)
>>
>> So two partitions are double the size. Is this expected behaviour or is
>> it some kind of bug?
>>
>> Thanks,
>> Maciej Sokołowski
>>
>
>


-- 
---
Takeshi Yamamuro

Mime
View raw message