spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maciej Sokołowski <matemac...@gmail.com>
Subject Re: Splitting RDD to exact number of partitions
Date Tue, 31 May 2016 14:25:01 GMT
Thanks.

At what conditions number of partitions can be higher than minPartitions
when reading textFile? Should this be considered as unfrequent situation?

To sum up - is there more efficient way to ensure exact number of
partitions than following:

rdd = sc.textFile("perf_test1.csv", minPartitions=128)
if (rdd.getNumPartitions > 128)
  rdd.repartition(128)

?


On 31 May 2016 at 16:11, Takeshi Yamamuro <linguin.m.s@gmail.com> wrote:

> `coalesce` without shuffling can only set  fewer partitions than its
> parent RDD.
> As Ted said, you need to set true in shuffle, or you need to use
> `RDD#repartition`.
>
> // maropu
>
> On Tue, May 31, 2016 at 11:02 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>
>> Value for shuffle is false by default.
>>
>> Have you tried setting it to true ?
>>
>> Which Spark release are you using ?
>>
>> On Tue, May 31, 2016 at 6:13 AM, Maciej Sokołowski <matemaciek@gmail.com>
>> wrote:
>>
>>> Hello Spark users and developers.
>>>
>>> I read file and want to ensure that it has exact number of partitions,
>>> for example 128.
>>>
>>> In documentation I found:
>>>
>>> def textFile(path: String, minPartitions: Int = defaultMinPartitions):
>>> RDD[String]
>>>
>>> But argument here is minimal number of partitions, so I use coalesce to
>>> ensure desired number of partitions:
>>>
>>> def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord:
>>> Ordering[T] = null): RDD[T]
>>> //Return a new RDD that is reduced into numPartitions partitions.
>>>
>>> So I combine them and get number of partitions lower than expected:
>>>
>>> scala> sc.textFile("perf_test1.csv",
>>> minPartitions=128).coalesce(128).getNumPartitions
>>> res14: Int = 126
>>>
>>> Is this expected behaviour? File contains 100000 lines, size of
>>> partitions before and after coalesce:
>>>
>>> scala> sc.textFile("perf_test1.csv",
>>> minPartitions=128).mapPartitions{rows => Iterator(rows.length)}.collect()
>>> res16: Array[Int] = Array(782, 781, 782, 781, 781, 782, 781, 781, 781,
>>> 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781,
>>> 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782,
>>> 781, 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782,
>>> 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781,
>>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781,
>>> 781, 782, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>>> 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781,
>>> 781, 782, 781, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781)
>>>
>>> scala> sc.textFile("perf_test1.csv",
>>> minPartitions=128).coalesce(128).mapPartitions{rows =>
>>> Iterator(rows.length)}.collect()
>>> res15: Array[Int] = Array(1563, 781, 781, 781, 782, 781, 781, 781, 781,
>>> 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 781, 782, 781, 781, 781,
>>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>>> 781, 782, 781, 781, 781, 781, 1563, 782, 781, 781, 782, 781, 781, 781, 781,
>>> 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781,
>>> 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781,
>>> 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781,
>>> 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 782,
>>> 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782)
>>>
>>> So two partitions are double the size. Is this expected behaviour or is
>>> it some kind of bug?
>>>
>>> Thanks,
>>> Maciej Sokołowski
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

Mime
View raw message