Hello Matei,
Thank you very much for your detailed reply!


2013/10/2 Matei Zaharia <matei.zaharia@gmail.com>
Hi Shangyu,

(1) When we read in a local file by SparkContext.textFile and do some map/reduce job on it, how will spark decide to send data to which worker node? Will the data be divided/partitioned equally according to the number of worker node and each worker node get one piece of data?

You actually can't run distributed jobs on local files. The local file URL only works on the same machine, or if the file is in a filesystem that's mounted on the same path on all worker nodes.

(2) If we read in data via HDFS, how will the above process work?

Spark uses the data placement information from HDFS to schedule tasks locally on each block of the file. It creates one task per block of the file by default (which is usually 64-128 MB), though you can ask for more tasks or use RDD.coalesce() to get fewer.

(3) SparkContext.textFile has a parameter 'minSplits'. Is it used for dividing data of input file into 'minSplits' partitions? Then how do we know each worker node receive how many petitions?

Yup, it's a lower bound on the number of partitions (files with more blocks might get more). The assignment to workers is not static. Spark just gives workers tasks as their previous tasks finish, so that you can get good load balancing even if some workers are faster than others.

(4) We can set up spark.default.parallelism for system property. Is this parameter applied on each worker node? Say, each worker node have 8 cores, if we set spark.default.parallelism=32, then each core will need to deal with 4 tasks? We can also set up spark_worker_instances in spark-env.sh. For the same worker node, if we set up spark_worker_instances=8, spark_worker_cores=1, spark.default.parallelism=32, then each core will still be sent 4 tasks? Will the performance of the whole system be different in these two situations?

No, this is at the level of entire jobs. You should set it to the number of cores in your cluster, or multiply that by 2-3 to get better load balancing, though often times Spark's default for this works well too.

(5) Will each map/reduce job be counted as one task? For example, 
sc.parallelize([0,1,2,3]).map(lambda x: x) Will there be four tasks?

A "task" means a specific thing in Spark, which is one unit of work that happens on one node. See http://spark.incubator.apache.org/docs/latest/cluster-overview.html for an overview of the terminology. The number of tasks depends on the number of partitions (blocks) in the RDD. In this case, sc.parallelize will probably create as many tasks as you have CPU cores, which is the default unless you give it another value. You can view the exact number of tasks on the job monitoring UI in Spark 0.8 (http://spark.incubator.apache.org/docs/latest/monitoring.html).

Matei


Any help will be appreciated.
Thanks!




--
--

Shangyu, Luo
Department of Computer Science
Rice University





--
--

Shangyu, Luo
Department of Computer Science
Rice University

--
Not Just Think About It, But Do It!
--
Success is never final.
--
Losers always whine about their best