On Tue, Jan 7, 2014 at 6:04 PM, Andrew Ash <andrew@andrewash.com> wrote:
I think that would do what you want.  I'm guessing in "..." you have an rdd and then call .collect on it -- normally this would be a bad idea because of large data sizes, but if you KNOW that it's small then you can force it through just that one machine.

This is what I'm doing:

val smallInput = sc.textFile("input")
val smallInputBroadcast = sc.broadcast(smallInput.collect())
sc.parallelize(Range.Int(0, smallInputBroadcast.value.length, System.getProperty("spark.cores.max").toInt)


Each worker needs to have access to full smallInput in later stages, so I'm sending the whole small dataset by broadcast to avoid future network shuffling. Then, each worker is assigned a certain part of smallInput by parallelize() to generate the bigger dataset.

Is this an idiomatic way in spark, or should it be done in another way?
 


On Tue, Jan 7, 2014 at 9:20 AM, Aureliano Buendia <buendia360@gmail.com> wrote:



On Tue, Jan 7, 2014 at 5:13 PM, Andrew Ash <andrew@andrewash.com> wrote:

If small-file is hosted in HDFS I think the default is one partition per HDFS block. If it's in one block, which are 64MB each by default, that might be one partition.

So if I want to parallelize processing that small file (which only fits in one block) over 100 machines, instead of calling:

sc.parallelize(..., smallInput.partitions.length)

should I call?:

sc.parallelize(..., System.getProperty("spark.cores.max").toInt)
 

Sent from my mobile phone

On Jan 7, 2014 8:46 AM, "Aureliano Buendia" <buendia360@gmail.com> wrote:



On Thu, Jan 2, 2014 at 5:52 PM, Andrew Ash <andrew@andrewash.com> wrote:
That sounds right Mayur.

Also in 0.8.1 I hear there's a new repartition method that you might be able to use to further distribute the data.  But if your data is so small that it fits in just a couple blocks, why are you using 20 machines just to process a quarter GB of data?

Here is a use case: We could start from an extremely small file which could be transformed into a huge in-memory dataset, then reduced to a very small dataset.

In a more concrete form, assume we have 100 worker machines and start from a small input file:

val smallInput = sc.textFile("small-input")

In this case, would smallInput.partitions.length be a small number, or would it be 100?

If we do expect the next transformation to make the data significantly bigger, how to force it to be processed over the 100 machines?
 
Is the computation on each bit extremely intensive?


On Thu, Jan 2, 2014 at 12:39 PM, Mayur Rustagi <mayur.rustagi@gmail.com> wrote:
I have experienced a similar issue. The easiest fix I found was to increase the replication of the data being used in the worker to the number of workers you want to use for processing. The RDD seem to created on all the machines where the blocks are replicated. Please correct me if I am wrong. 

Regards
Mayur



On Thu, Jan 2, 2014 at 10:46 PM, Andrew Ash <andrew@andrewash.com> wrote:
Hi lihu,

Maybe the data you're accessing is in in HDFS and only resides on 4 of your 20 machines because it's only about 4 blocks (at default 64MB / block that's around a quarter GB).  Where is your source data located and how is it stored?

Andrew


On Thu, Jan 2, 2014 at 7:53 AM, lihu <lihu723@gmail.com> wrote:
Hi,
   I run  spark on a cluster with 20 machine, but when I start an application use the spark-shell, there only 4 machine is working , the other with just idle, without memery and cpu used, I watch this through webui.

   I wonder the other machine maybe  busy, so i watch the machines using  "top" and "free" command, but this is not。
 
   So I just wonder why not spark assignment work to all all the 20 machine? this is not a good resource usage.