Ah, ok. Thanks for the clarification. 

When I create a file that is only visible on the master I get the following error...
f.map(l=>l.split(" ")).collect
13/10/03 20:38:48 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/10/03 20:38:48 WARN snappy.LoadSnappy: Snappy native library not loaded
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/root/ue/onlymaster
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)

On Thu, Oct 3, 2013 at 12:34 PM, Mark Hamstra <mark@clearstorydata.com> wrote:
But the worker has to be on a node that has local access to the file.

On Thu, Oct 3, 2013 at 12:30 PM, Shay Seng <shay@1618labs.com> wrote:
Ok, even if my understanding of allowLocal is incorrect, nevertheless
(1) I'm loading a local file
(2) The tasks seem as if they are getting executed on a slave node (ip-10-129-25-28) is not my master node

On Thu, Oct 3, 2013 at 12:22 PM, Mark Hamstra <mark@clearstorydata.com> wrote:
No, that is not what allowLocal means.  For a very few actions, the DAGScheduler will run the job locally (in a separate thread on the master node) if the RDD in the action has a single partition and no dependencies in its lineage.  If allowLocal is false, that doesn't mean that SparkContext.textFile called on a local file will magically turn that local file into a distributed file and allow more than just the node where the file is local to process that file. 

On Thu, Oct 3, 2013 at 11:05 AM, Shay Seng <shay@1618labs.com> wrote:

On Wed, Oct 2, 2013 at 1:00 PM, Matei Zaharia <matei.zaharia@gmail.com> wrote:
Hi Shangyu,

(1) When we read in a local file by SparkContext.textFile and do some map/reduce job on it, how will spark decide to send data to which worker node? Will the data be divided/partitioned equally according to the number of worker node and each worker node get one piece of data?
You actually can't run distributed jobs on local files. The local file URL only works on the same machine, or if the file is in a filesystem that's mounted on the same path on all worker nodes.

Is this really true? 

scala> val f = sc.textFile("/root/ue/ue_env.sh")
13/10/03 17:55:45 INFO storage.MemoryStore: ensureFreeSpace(34870) called with curMem=34870, maxMem=4081511301
13/10/03 17:55:45 INFO storage.MemoryStore: Block broadcast_1 stored as values to memory (estimated size 34.1 KB, free 3.8 GB)
f: spark.RDD[String] = MappedRDD[5] at textFile at <console>:12

scala> f.map(l=>l.split(" ")).collect
13/10/03 17:55:51 INFO mapred.FileInputFormat: Total input paths to process : 1
13/10/03 17:55:51 INFO spark.SparkContext: Starting job: collect at <console>:15
13/10/03 17:55:51 INFO scheduler.DAGScheduler: Got job 2 (collect at <console>:15) with 2 output partitions (allowLocal=false)
13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:0 as TID 4 on executor 0: ip-10-129-25-28 (preferred)
13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:0 as 1517 bytes in 3 ms
13/10/03 17:55:51 INFO cluster.TaskSetManager: Starting task 2.0:1 as TID 5 on executor 0: ip-10-129-25-28 (preferred)
13/10/03 17:55:51 INFO cluster.TaskSetManager: Serialized task 2.0:1 as 1517 bytes in 0 ms

Doesn't allowLocal=false mean the job is getting distributed to workers rather than computed locally?



Any help will be appreciated.


Shangyu, Luo
Department of Computer Science
Rice University