spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Archit Thakur <archit279tha...@gmail.com>
Subject Re: Controlling hadoop block size
Date Fri, 17 Jan 2014 08:01:03 GMT
On Thu, Jan 16, 2014 at 11:40 PM, Aureliano Buendia <buendia360@gmail.com>wrote:

>
>
>
> On Thu, Jan 16, 2014 at 11:39 AM, Archit Thakur <archit279thakur@gmail.com
> > wrote:
>
>> The command
>>
>> val rdd1 = sc.parallelize(Range(0, N, 1)) // N ~ 1e3
>> val rdd2 = rdd1.cartesian(rdd1)
>>
>>
>> will create a "CartesianRDD" which has no. of partitions = N^2 (N defined
>> by your code).
>>
>
> N^2 partitions does not seem to be correct. With N = 1,000, spark creates
> about 30,000 partitions, instead of 1,000,000. It seems other parameters,
> such as the number of cores, also affect this.
>

Now this seems interesting, because I can find it in the code that

override def getPartitions: Array[Partition] = {

    // create the cross product split

    val array = new Array[Partition](rdd1.partitions.size *
rdd2.partitions.size)

    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {

      val idx = s1.index * numPartitionsInRdd2 + s2.index

      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index,
s2.index)

    }

    array
  }

that array size is the product of rdd's size, rdd1.partitions.size *
rdd2.partitions.size.

Now, What other factors control that may be number of cores, but Still
why 30,000 in your case? Does your core_on_each_machine* Nmachine yield
to 30,000?


>
>>
>> Q: Is the number of part-* files determined by rdd2.partitions.length?
>> Yes.
>>
>
> Just to confirm, is the number of tasks also the same as the number of
> partitions?
>
> I can see that the number of part-* is exactly the same as the number of
> tasks.
>
>

Now since, your operation is shuffle one, it'll have two stages and
no_of_task should be no_of_partition for both stages. Can you confirm does
it launch no_of_partition tasks twice?


>> Q: Is there a way to keep the size of each part-* file a constant (eg 64
>> MB) regardless of other parameters, including number of available cores and
>> scheduled tasks?
>> Lets say your RDD has N MB of data.
>> You can create your own CustmRDD and by overriding getPartitions and
>> partitioner- you can create no .of partitions = N/64 and distribute the
>> data equally (64MB). and perform rdd.operationToTransformItInto_CustmRDD.
>>
>
> I tried to control the partition size by passing a second argument to
> sc.parallelize(), but, when I decrease the number of partitions in such a
> way that each partition size goes slightly over 3 MB, I get akka timeout,
> which is already set to 100 seconds.
>

Try increasing timeout, maybe.

>
>
>>
>> PS: There might be an operation/RDD that already does the same, I am not
>> aware of it as of now. Please let me know, if you are able to figure it out.
>>
>> Thanks and Regards,
>> Archit Thakur.
>>
>>
>> On Tue, Jan 14, 2014 at 11:51 PM, Aureliano Buendia <buendia360@gmail.com
>> > wrote:
>>
>>>
>>>
>>>
>>> On Tue, Jan 14, 2014 at 5:00 PM, Archit Thakur <
>>> archit279thakur@gmail.com> wrote:
>>>
>>>> Hadoop block size decreased, do you mean HDFS block size? That is not
>>>> possible.
>>>>
>>>
>>> Sorry for terminology mix up. In my question 'hadoop block size' should
>>> probably be replaced by 'RDD partitions number'.
>>>
>>> I'm getting a large number of small files (named part-*), and I'd like
>>> to get a smaller number of larger files.
>>>
>>> I used something like:
>>>
>>> val rdd1 = sc.parallelize(Range(0, N, 1)) // N ~ 1e3
>>> val rdd2 = rdd1.cartesian(rdd1)
>>>
>>> Is the number of part-* files determined by rdd2.partitions.length?
>>>
>>> Is there a way to keep the size of each part-* file a constant (eg 64
>>> MB) regardless of other parameters, including number of available cores and
>>> scheduled tasks?
>>>
>>>
>>>> Block size of HDFS is never affected by your spark jobs.
>>>>
>>>> "For a big number of tasks, I get a very high number of 1 MB files
>>>> generated by saveAsSequenceFile()."
>>>>
>>>> What do you mean by "big number of tasks"
>>>>
>>>> No. of files generated by saveAsSequenceFile() increases if your
>>>> partitions of RDD are increased.
>>>>
>>>> Are you using your custom RDD? If Yes, you would have overridden the
>>>> method getPartitions - Check that.
>>>> If not, you might have used an operation where you specify your
>>>> partitioner or no. of output partitions, eg. groupByKey() - Check that.
>>>>
>>>> "How is it possible to control the block size by spark?" Do you mean
>>>> "How is it possible to control the output partitions of an RDD?"
>>>>
>>>>
>>>> On Tue, Jan 14, 2014 at 7:59 AM, Aureliano Buendia <
>>>> buendia360@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Does the output hadoop block size depend on spark tasks number?
>>>>>
>>>>> In my application, when the number of tasks increases, hadoop block
>>>>> size decreases. For a big number of tasks, I get a very high number of
1 MB
>>>>> files generated by saveAsSequenceFile().
>>>>>
>>>>> How is it possible to control the block size by spark?
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message