spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aureliano Buendia <buendia...@gmail.com>
Subject Re: the spark worker assignment Question?
Date Tue, 07 Jan 2014 18:12:45 GMT
On Tue, Jan 7, 2014 at 6:04 PM, Andrew Ash <andrew@andrewash.com> wrote:

> I think that would do what you want.  I'm guessing in "..." you have an
> rdd and then call .collect on it -- normally this would be a bad idea
> because of large data sizes, but if you KNOW that it's small then you can
> force it through just that one machine.
>

This is what I'm doing:

val smallInput = sc.textFile("input")
val smallInputBroadcast = sc.broadcast(smallInput.collect())
sc.parallelize(Range.Int(0, smallInputBroadcast.value.length,
System.getProperty("spark.cores.max").toInt)

Each worker needs to have access to full smallInput in later stages, so I'm
sending the whole small dataset by broadcast to avoid future network
shuffling. Then, each worker is assigned a certain part of smallInput by
parallelize() to generate the bigger dataset.

Is this an idiomatic way in spark, or should it be done in another way?


>
>
> On Tue, Jan 7, 2014 at 9:20 AM, Aureliano Buendia <buendia360@gmail.com>wrote:
>
>>
>>
>>
>> On Tue, Jan 7, 2014 at 5:13 PM, Andrew Ash <andrew@andrewash.com> wrote:
>>
>>> If small-file is hosted in HDFS I think the default is one partition per
>>> HDFS block. If it's in one block, which are 64MB each by default, that
>>> might be one partition.
>>>
>> So if I want to parallelize processing that small file (which only fits
>> in one block) over 100 machines, instead of calling:
>>
>> sc.parallelize(..., smallInput.partitions.length)
>>
>> should I call?:
>>
>> sc.parallelize(..., System.getProperty("spark.cores.max").toInt)
>>
>>
>>> Sent from my mobile phone
>>> On Jan 7, 2014 8:46 AM, "Aureliano Buendia" <buendia360@gmail.com>
>>> wrote:
>>>
>>>>
>>>>
>>>>
>>>> On Thu, Jan 2, 2014 at 5:52 PM, Andrew Ash <andrew@andrewash.com>wrote:
>>>>
>>>>> That sounds right Mayur.
>>>>>
>>>>> Also in 0.8.1 I hear there's a new repartition method that you might
>>>>> be able to use to further distribute the data.  But if your data is so
>>>>> small that it fits in just a couple blocks, why are you using 20 machines
>>>>> just to process a quarter GB of data?
>>>>>
>>>>
>>>> Here is a use case: We could start from an extremely small file which
>>>> could be transformed into a huge in-memory dataset, then reduced to a very
>>>> small dataset.
>>>>
>>>> In a more concrete form, assume we have 100 worker machines and start
>>>> from a small input file:
>>>>
>>>> val smallInput = sc.textFile("small-input")
>>>>
>>>> In this case, would smallInput.partitions.length be a small number, or
>>>> would it be 100?
>>>>
>>>> If we do expect the next transformation to make the data significantly
>>>> bigger, how to force it to be processed over the 100 machines?
>>>>
>>>>
>>>>> Is the computation on each bit extremely intensive?
>>>>>
>>>>>
>>>>> On Thu, Jan 2, 2014 at 12:39 PM, Mayur Rustagi <
>>>>> mayur.rustagi@gmail.com> wrote:
>>>>>
>>>>>> I have experienced a similar issue. The easiest fix I found was to
>>>>>> increase the replication of the data being used in the worker to
the number
>>>>>> of workers you want to use for processing. The RDD seem to created
on all
>>>>>> the machines where the blocks are replicated. Please correct me if
I am
>>>>>> wrong.
>>>>>>
>>>>>> Regards
>>>>>> Mayur
>>>>>>
>>>>>> Mayur Rustagi
>>>>>> Ph: +919632149971
>>>>>> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
>>>>>> https://twitter.com/mayur_rustagi
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 2, 2014 at 10:46 PM, Andrew Ash <andrew@andrewash.com>wrote:
>>>>>>
>>>>>>> Hi lihu,
>>>>>>>
>>>>>>> Maybe the data you're accessing is in in HDFS and only resides
on 4
>>>>>>> of your 20 machines because it's only about 4 blocks (at default
64MB /
>>>>>>> block that's around a quarter GB).  Where is your source data
located and
>>>>>>> how is it stored?
>>>>>>>
>>>>>>> Andrew
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 2, 2014 at 7:53 AM, lihu <lihu723@gmail.com>
wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>    I run  spark on a cluster with 20 machine, but when I
start an
>>>>>>>> application use the spark-shell, there only 4 machine is
working , the
>>>>>>>> other with just idle, without memery and cpu used, I watch
this through
>>>>>>>> webui.
>>>>>>>>
>>>>>>>>    I wonder the other machine maybe  busy, so i watch the
machines
>>>>>>>> using  "top" and "free" command, but this is not。
>>>>>>>>
>>>>>>>>   * So I just wonder why not spark assignment work to all
all the
>>>>>>>> 20 machine? this is not a good resource usage.*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Mime
View raw message